use std::sync::Arc;
use epics_base_rs::server::database::{ExternalPvResolver, LinkSet, PvDatabase};
use epics_base_rs::types::EpicsValue;
use epics_pva_rs::pvdata::{PvField, ScalarValue};
use super::config::{LinkDirection, PvaLinkConfig};
use super::link::{PvaLink, PvaLinkError, PvaLinkResult};
use super::registry::PvaLinkRegistry;
fn block_in_place_or_warn<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
use tokio::runtime::{Handle, RuntimeFlavor};
if let Ok(handle) = Handle::try_current() {
match handle.runtime_flavor() {
RuntimeFlavor::MultiThread => tokio::task::block_in_place(f),
_ => f(),
}
} else {
f()
}
}
#[derive(Clone)]
pub struct PvaLinkResolver {
registry: Arc<PvaLinkRegistry>,
handle: tokio::runtime::Handle,
reads: Arc<std::sync::atomic::AtomicU64>,
enabled: Arc<std::sync::atomic::AtomicBool>,
link_options: Arc<parking_lot::RwLock<std::collections::HashMap<String, PvaLinkConfig>>>,
out_link_options: Arc<parking_lot::RwLock<std::collections::HashMap<String, PvaLinkConfig>>>,
db: Arc<parking_lot::RwLock<Option<PvDatabase>>>,
scan_targets: Arc<parking_lot::RwLock<std::collections::HashMap<String, ScanFanout>>>,
forwarders: Arc<parking_lot::Mutex<std::collections::HashSet<String>>>,
#[cfg(feature = "qsrv")]
qsrv: Arc<parking_lot::RwLock<Option<Arc<crate::qsrv::BridgeProvider>>>>,
}
#[derive(Default)]
struct ScanFanout {
records: Vec<ScanTarget>,
}
struct ScanTarget {
record: String,
always: bool,
monorder: i32,
atomic: bool,
passive_only: bool,
field: String,
}
impl PvaLinkResolver {
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self {
registry: Arc::new(PvaLinkRegistry::new()),
handle,
reads: Arc::new(std::sync::atomic::AtomicU64::new(0)),
enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
link_options: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
out_link_options: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
db: Arc::new(parking_lot::RwLock::new(None)),
scan_targets: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
forwarders: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())),
#[cfg(feature = "qsrv")]
qsrv: Arc::new(parking_lot::RwLock::new(None)),
}
}
pub fn attach_database(&self, db: PvDatabase) {
*self.db.write() = Some(db);
}
#[cfg(feature = "qsrv")]
pub fn attach_qsrv_provider(&self, provider: Arc<crate::qsrv::BridgeProvider>) {
*self.qsrv.write() = Some(provider);
}
#[cfg(feature = "qsrv")]
pub fn with_qsrv_provider(self, provider: Arc<crate::qsrv::BridgeProvider>) -> Self {
self.attach_qsrv_provider(provider);
self
}
pub fn set_enabled(&self, on: bool) {
self.enabled.store(on, std::sync::atomic::Ordering::Relaxed);
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn open(&self, pv_name: &str) -> PvaLinkResult<Arc<PvaLink>> {
self.registry.get_or_open(self.inp_cfg_for(pv_name)).await
}
pub async fn open_link(&self, link_string: &str) -> PvaLinkResult<Arc<PvaLink>> {
self.open_link_inner(link_string, None).await
}
pub async fn open_link_for_record(
&self,
link_string: &str,
record: &str,
) -> PvaLinkResult<Arc<PvaLink>> {
self.open_link_inner(link_string, Some(record.to_string()))
.await
}
async fn open_link_inner(
&self,
link_string: &str,
record: Option<String>,
) -> PvaLinkResult<Arc<PvaLink>> {
let cfg = PvaLinkConfig::parse(link_string, LinkDirection::Inp)?;
let cfg = PvaLinkConfig {
monitor: true,
..cfg
};
let pv_name = cfg.pv_name.clone();
if cfg.local {
#[cfg(feature = "qsrv")]
let mut is_local = self.is_local_in_db(&pv_name).await;
#[cfg(not(feature = "qsrv"))]
let is_local = self.is_local_in_db(&pv_name).await;
#[cfg(feature = "qsrv")]
if !is_local {
let provider = self.qsrv.read().clone();
if let Some(provider) = provider {
is_local = provider.hosts_pv(&pv_name).await;
}
}
if !is_local {
return Err(PvaLinkError::NotLocal(pv_name));
}
}
let full_key = strip_scheme(link_string).unwrap_or(link_string).to_string();
self.link_options.write().insert(full_key, cfg.clone());
if let Some(rec) = record {
if cfg.scan_on_update {
self.scan_targets
.write()
.entry(pv_name.clone())
.or_default()
.records
.push(ScanTarget {
record: rec,
always: cfg.always,
monorder: cfg.monorder,
atomic: cfg.atomic,
passive_only: cfg.scan_on_passive,
field: cfg.field.clone(),
});
}
}
let link = self.registry.get_or_open(cfg).await?;
self.spawn_notify_forwarder(&pv_name, &link);
Ok(link)
}
fn spawn_notify_forwarder(&self, pv_name: &str, link: &Arc<PvaLink>) {
{
let mut started = self.forwarders.lock();
if started.contains(pv_name) {
return;
}
started.insert(pv_name.to_string());
}
let Some(rx) = link.take_notify_rx() else {
self.forwarders.lock().remove(pv_name);
return;
};
let pv_name = pv_name.to_string();
let scan_targets = self.scan_targets.clone();
let db = self.db.clone();
self.handle
.spawn(run_notify_forwarder(pv_name, rx, scan_targets, db));
}
async fn is_local_in_db(&self, pv_name: &str) -> bool {
let db = self.db.read().clone();
match db {
Some(db) => {
db.get_record_no_resolve(pv_name).await.is_some()
|| db.find_pv(pv_name).await.is_some()
}
None => false,
}
}
fn inp_cfg_for(&self, full: &str) -> PvaLinkConfig {
let opts = self.link_options.read();
if let Some(cfg) = opts.get(full) {
return PvaLinkConfig {
monitor: true,
..cfg.clone()
};
}
let bare = strip_query(full);
if bare != full {
if let Some(cfg) = opts.get(bare) {
return PvaLinkConfig {
monitor: true,
..cfg.clone()
};
}
}
default_inp_cfg(bare)
}
fn out_cfg_for(&self, full: &str) -> PvaLinkConfig {
let opts = self.out_link_options.read();
if let Some(cfg) = opts.get(full) {
return cfg.clone();
}
let bare = strip_query(full);
if bare != full {
if let Some(cfg) = opts.get(bare) {
return cfg.clone();
}
}
PvaLinkConfig::defaults_for(bare, LinkDirection::Out)
}
pub async fn open_out_link(&self, link_string: &str) -> PvaLinkResult<Arc<PvaLink>> {
let cfg = PvaLinkConfig::parse(link_string, LinkDirection::Out)?;
let full_key = strip_scheme(link_string).unwrap_or(link_string).to_string();
self.out_link_options.write().insert(full_key, cfg.clone());
self.registry.get_or_open(cfg).await
}
pub fn read_count(&self) -> u64 {
self.reads.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn link_count(&self) -> usize {
self.registry.len()
}
pub fn link_alarm_severity(&self, pv_name: &str) -> Option<i32> {
let full = strip_scheme(pv_name)?;
let bare = strip_query(full);
if full != bare {
lazy_register_inp_opts(&self.link_options, full);
}
let sevr = self.inp_cfg_for(full).sevr;
self.registry
.try_get_any(bare, LinkDirection::Inp)?
.link_alarm_severity_with(sevr)
}
pub async fn wait_for_link_connected(
&self,
pv_name: &str,
timeout: std::time::Duration,
) -> bool {
let link = match self.open(pv_name).await {
Ok(l) => l,
Err(_) => return false,
};
let deadline = std::time::Instant::now() + timeout;
loop {
if link.read().await.is_ok() {
return true;
}
if std::time::Instant::now() >= deadline {
return false;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
pub fn build_resolver(self) -> ExternalPvResolver {
let resolver = self;
Arc::new(move |name: &str| -> Option<EpicsValue> {
if !resolver.is_enabled() {
return None;
}
let full = match name.strip_prefix("pva://") {
Some(stripped) => stripped,
None => {
if name.starts_with("ca://") {
return None;
}
name
}
};
let bare = strip_query(full);
if full != bare {
lazy_register_inp_opts(&resolver.link_options, full);
}
let cfg = resolver.inp_cfg_for(full);
if let Some(link) = resolver.registry.try_get_any(bare, LinkDirection::Inp)
&& let Some(value) = link.try_read_cached_with_field(&cfg.field)
{
resolver
.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return pvfield_to_epics_value(&value);
}
let field = cfg.field.clone();
let (link, value) = block_in_place_or_warn(|| {
resolver.handle.block_on(async {
let link = resolver.registry.get_or_open(cfg).await.ok()?;
let value = link.read_with_field(&field).await.ok()?;
Some((link, value))
})
})?;
let _ = link;
resolver
.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
pvfield_to_epics_value(&value)
})
}
}
pub async fn install_pvalink_resolver(
db: &Arc<PvDatabase>,
handle: tokio::runtime::Handle,
) -> PvaLinkResolver {
let resolver = PvaLinkResolver::new(handle);
resolver.attach_database((**db).clone());
db.set_external_resolver(resolver.clone().build_resolver())
.await;
db.register_link_set("pva", Arc::new(resolver.clone()))
.await;
use epics_base_rs::server::record::ParsedLink;
for record_name in db.all_record_names().await {
for (field_name, _raw, parsed) in db.record_link_fields(&record_name).await {
let ParsedLink::Pva(ref s) = parsed else {
continue;
};
if !s.contains('?') {
continue;
}
let link_str = format!("pva://{s}");
if field_name == "OUT" {
let _ = resolver.open_out_link(&link_str).await;
} else {
let _ = resolver.open_link_for_record(&link_str, &record_name).await;
}
}
}
resolver
}
type ScanTargetMap = Arc<parking_lot::RwLock<std::collections::HashMap<String, ScanFanout>>>;
async fn run_notify_forwarder(
pv_name: String,
mut rx: tokio::sync::mpsc::Receiver<PvField>,
scan_targets: ScanTargetMap,
db: Arc<parking_lot::RwLock<Option<PvDatabase>>>,
) {
let mut last: std::collections::HashMap<(String, String), PvField> =
std::collections::HashMap::new();
while let Some(value) = rx.recv().await {
let mut targets: Vec<(String, bool, i32, bool, bool, String)> =
match scan_targets.read().get(&pv_name) {
Some(fanout) => fanout
.records
.iter()
.map(|t| {
(
t.record.clone(),
t.always,
t.monorder,
t.atomic,
t.passive_only,
t.field.clone(),
)
})
.collect(),
None => Vec::new(),
};
targets.sort_by_key(|(_, _, order, atomic, _, _)| (!*atomic, *order));
let Some(db_handle) = db.read().clone() else {
continue;
};
let atomic_records: Vec<String> = targets
.iter()
.filter(|(_, _, _, atomic, _, _)| *atomic)
.map(|(record, _, _, _, _, _)| record.clone())
.collect();
let mut atomic_epoch = if atomic_records.is_empty() {
None
} else {
Some(db_handle.lock_records(&atomic_records).await)
};
for (record, always, _order, atomic, passive_only, field) in &targets {
if !*atomic {
atomic_epoch = None;
}
let changed = {
let leaf = extract_leaf(&value, field);
let key = (record.clone(), field.clone());
let prev = last.get(&key);
let did_change = prev != Some(&leaf);
last.insert(key, leaf);
did_change
};
if !changed && !*always && !*atomic {
continue;
}
if *passive_only {
let is_passive = match db_handle.get_record(record).await {
Some(rec) => matches!(
rec.read().await.common.scan,
epics_base_rs::server::record::ScanType::Passive
),
None => continue,
};
if !is_passive {
continue;
}
}
let mut visited = std::collections::HashSet::new();
if *atomic {
let _ = db_handle
.process_record_with_links_already_locked(record, &mut visited, 0)
.await;
} else {
let _ = db_handle
.process_record_with_links(record, &mut visited, 0)
.await;
}
}
drop(atomic_epoch);
}
}
fn extract_leaf(root: &PvField, path: &str) -> PvField {
if path.is_empty() {
return root.clone();
}
let mut cursor = root.clone();
for segment in path.split('.') {
cursor = match cursor {
PvField::Structure(s) => s.get_field(segment).cloned().unwrap_or(PvField::Null),
other => return other,
};
}
cursor
}
impl LinkSet for PvaLinkResolver {
fn is_connected(&self, name: &str) -> bool {
let Some(full) = strip_scheme(name) else {
return false;
};
let bare = strip_query(full);
match self.registry.try_get_any(bare, LinkDirection::Inp) {
Some(link) => link.is_connected(),
None => false,
}
}
fn get_value(&self, name: &str) -> Option<EpicsValue> {
if !self.is_enabled() {
return None;
}
let full = strip_scheme(name)?;
let bare = strip_query(full);
if full != bare {
lazy_register_inp_opts(&self.link_options, full);
}
let cfg = self.inp_cfg_for(full);
if let Some(link) = self.registry.try_get_any(bare, LinkDirection::Inp)
&& let Some(value) = link.try_read_cached_with_field(&cfg.field)
{
self.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return pvfield_to_epics_value(&value);
}
let field = cfg.field.clone();
let value = block_in_place_or_warn(|| {
self.handle.block_on(async {
let link = self.registry.get_or_open(cfg).await.ok()?;
link.read_with_field(&field).await.ok()
})
})?;
self.reads
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
pvfield_to_epics_value(&value)
}
fn put_value(&self, name: &str, value: EpicsValue) -> Result<(), String> {
if !self.is_enabled() {
return Err("pvalink disabled".into());
}
let full = strip_scheme(name).ok_or_else(|| {
format!("pvalink rejects ca:// scheme: {name} (use the CA-link path instead)")
})?;
let bare = strip_query(full);
if full != bare {
lazy_register_out_opts(&self.out_link_options, full);
}
let cfg = self.out_cfg_for(full);
let array_path = is_array_value(&value);
block_in_place_or_warn(|| {
self.handle.block_on(async {
let link = self
.registry
.get_or_open(cfg)
.await
.map_err(|e| e.to_string())?;
if array_path {
let pv_field = crate::convert::epics_to_pv_field(&value);
link.write_pv_field(&pv_field)
.await
.map_err(|e| e.to_string())
} else {
let value_str = value.to_string();
link.write(&value_str).await.map_err(|e| e.to_string())
}
})
})
}
fn alarm_message(&self, name: &str) -> Option<String> {
let full = strip_scheme(name)?;
let bare = strip_query(full);
if full != bare {
lazy_register_inp_opts(&self.link_options, full);
}
let cfg = self.inp_cfg_for(full);
let sevr = cfg.sevr;
let link = block_in_place_or_warn(|| {
self.handle
.block_on(async { self.registry.get_or_open(cfg).await.ok() })
})?;
link.alarm_message_with(sevr)
}
fn alarm_severity(&self, name: &str) -> Option<i32> {
let full = strip_scheme(name)?;
let bare = strip_query(full);
if full != bare {
lazy_register_inp_opts(&self.link_options, full);
}
let cfg = self.inp_cfg_for(full);
let sevr = cfg.sevr;
let link = block_in_place_or_warn(|| {
self.handle
.block_on(async { self.registry.get_or_open(cfg).await.ok() })
})?;
link.link_alarm_severity_with(sevr)
}
fn time_stamp(&self, name: &str) -> Option<(i64, i32)> {
let full = strip_scheme(name)?;
let bare = strip_query(full);
if full != bare {
lazy_register_inp_opts(&self.link_options, full);
}
let cfg = self.inp_cfg_for(full);
let want_time = cfg.time;
let link = block_in_place_or_warn(|| {
self.handle
.block_on(async { self.registry.get_or_open(cfg).await.ok() })
})?;
if !want_time {
return None;
}
link.time_stamp()
}
fn link_metadata(&self, name: &str) -> Option<epics_base_rs::server::database::LinkMetadata> {
let full = strip_scheme(name)?;
let bare = strip_query(full);
if full != bare {
lazy_register_inp_opts(&self.link_options, full);
}
let cfg = self.inp_cfg_for(full);
let field = cfg.field.clone();
let link = block_in_place_or_warn(|| {
self.handle
.block_on(async { self.registry.get_or_open(cfg).await.ok() })
})?;
link.link_metadata_with(&field)
}
fn link_names(&self) -> Vec<String> {
Vec::new()
}
}
fn strip_scheme(name: &str) -> Option<&str> {
if let Some(stripped) = name.strip_prefix("pva://") {
return Some(stripped);
}
if name.starts_with("ca://") {
return None;
}
Some(name)
}
fn strip_query(s: &str) -> &str {
s.split_once('?').map_or(s, |(bare, _)| bare)
}
fn lazy_register_inp_opts(
link_options: &parking_lot::RwLock<std::collections::HashMap<String, PvaLinkConfig>>,
full: &str,
) {
if link_options.read().contains_key(full) {
return;
}
if let Ok(cfg) = PvaLinkConfig::parse(&format!("pva://{full}"), LinkDirection::Inp) {
link_options.write().insert(
full.to_string(),
PvaLinkConfig {
monitor: true,
..cfg
},
);
}
}
fn lazy_register_out_opts(
out_link_options: &parking_lot::RwLock<std::collections::HashMap<String, PvaLinkConfig>>,
full: &str,
) {
if out_link_options.read().contains_key(full) {
return;
}
if let Ok(cfg) = PvaLinkConfig::parse(&format!("pva://{full}"), LinkDirection::Out) {
out_link_options.write().insert(full.to_string(), cfg);
}
}
fn default_inp_cfg(pv_name: &str) -> PvaLinkConfig {
PvaLinkConfig {
monitor: true,
..PvaLinkConfig::defaults_for(pv_name, LinkDirection::Inp)
}
}
fn is_array_value(value: &EpicsValue) -> bool {
match value {
EpicsValue::ShortArray(_)
| EpicsValue::FloatArray(_)
| EpicsValue::EnumArray(_)
| EpicsValue::DoubleArray(_)
| EpicsValue::LongArray(_)
| EpicsValue::CharArray(_)
| EpicsValue::Int64Array(_)
| EpicsValue::UInt64Array(_)
| EpicsValue::StringArray(_) => true,
EpicsValue::String(_)
| EpicsValue::Short(_)
| EpicsValue::Float(_)
| EpicsValue::Enum(_)
| EpicsValue::Char(_)
| EpicsValue::Long(_)
| EpicsValue::Double(_)
| EpicsValue::Int64(_)
| EpicsValue::UInt64(_) => false,
}
}
fn pvfield_to_epics_value(field: &PvField) -> Option<EpicsValue> {
match field {
PvField::Scalar(sv) => Some(scalar_to_epics(sv)),
PvField::Structure(s) => {
for (name, sub) in &s.fields {
if name == "value" {
return pvfield_to_epics_value(sub);
}
}
None
}
PvField::ScalarArray(arr) => {
let first = arr.first()?;
match first {
ScalarValue::Double(_) => Some(EpicsValue::DoubleArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::Double(d) = s {
Some(*d)
} else {
None
}
})
.collect(),
)),
ScalarValue::Float(_) => Some(EpicsValue::FloatArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::Float(f) = s {
Some(*f)
} else {
None
}
})
.collect(),
)),
ScalarValue::Int(_) => Some(EpicsValue::LongArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::Int(i) = s {
Some(*i)
} else {
None
}
})
.collect(),
)),
ScalarValue::Long(_) => Some(EpicsValue::Int64Array(
arr.iter()
.filter_map(|s| {
if let ScalarValue::Long(l) = s {
Some(*l)
} else {
None
}
})
.collect(),
)),
ScalarValue::Short(_) => Some(EpicsValue::ShortArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::Short(v) = s {
Some(*v)
} else {
None
}
})
.collect(),
)),
ScalarValue::UShort(_) => Some(EpicsValue::ShortArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::UShort(v) = s {
Some(*v as i16)
} else {
None
}
})
.collect(),
)),
ScalarValue::UInt(_) => Some(EpicsValue::LongArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::UInt(v) = s {
Some(*v as i32)
} else {
None
}
})
.collect(),
)),
ScalarValue::ULong(_) => Some(EpicsValue::UInt64Array(
arr.iter()
.filter_map(|s| {
if let ScalarValue::ULong(v) = s {
Some(*v)
} else {
None
}
})
.collect(),
)),
ScalarValue::Byte(_) => Some(EpicsValue::ShortArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::Byte(v) = s {
Some(*v as i16)
} else {
None
}
})
.collect(),
)),
ScalarValue::UByte(_) => Some(EpicsValue::CharArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::UByte(v) = s {
Some(*v)
} else {
None
}
})
.collect(),
)),
ScalarValue::String(_) => Some(EpicsValue::StringArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::String(v) = s {
Some(v.clone())
} else {
None
}
})
.collect(),
)),
ScalarValue::Boolean(_) => Some(EpicsValue::LongArray(
arr.iter()
.filter_map(|s| {
if let ScalarValue::Boolean(v) = s {
Some(if *v { 1 } else { 0 })
} else {
None
}
})
.collect(),
)),
}
}
PvField::ScalarArrayTyped(arr) => {
use epics_pva_rs::pvdata::TypedScalarArray;
match arr {
TypedScalarArray::Double(a) => Some(EpicsValue::DoubleArray(a.to_vec())),
TypedScalarArray::Float(a) => Some(EpicsValue::FloatArray(a.to_vec())),
TypedScalarArray::Int(a) => Some(EpicsValue::LongArray(a.to_vec())),
TypedScalarArray::Long(a) => Some(EpicsValue::Int64Array(a.to_vec())),
TypedScalarArray::Short(a) => Some(EpicsValue::ShortArray(a.to_vec())),
TypedScalarArray::UShort(a) => Some(EpicsValue::ShortArray(
a.iter().map(|v| *v as i16).collect(),
)),
TypedScalarArray::UInt(a) => {
Some(EpicsValue::LongArray(a.iter().map(|v| *v as i32).collect()))
}
TypedScalarArray::ULong(a) => Some(EpicsValue::UInt64Array(a.to_vec())),
TypedScalarArray::Byte(a) => Some(EpicsValue::ShortArray(
a.iter().map(|v| *v as i16).collect(),
)),
TypedScalarArray::UByte(a) => Some(EpicsValue::CharArray(a.to_vec())),
TypedScalarArray::String(a) => Some(EpicsValue::StringArray(a.to_vec())),
TypedScalarArray::Boolean(a) => Some(EpicsValue::LongArray(
a.iter().map(|v| if *v { 1 } else { 0 }).collect(),
)),
}
}
_ => None,
}
}
fn scalar_to_epics(sv: &ScalarValue) -> EpicsValue {
match sv {
ScalarValue::Double(v) => EpicsValue::Double(*v),
ScalarValue::Float(v) => EpicsValue::Float(*v),
ScalarValue::Long(v) => EpicsValue::Int64(*v),
ScalarValue::Int(v) => EpicsValue::Long(*v),
ScalarValue::Short(v) => EpicsValue::Short(*v),
ScalarValue::Byte(v) => EpicsValue::Char(*v as u8),
ScalarValue::ULong(v) => EpicsValue::UInt64(*v),
ScalarValue::UInt(v) => EpicsValue::Long(*v as i32),
ScalarValue::UShort(v) => EpicsValue::Short(*v as i16),
ScalarValue::UByte(v) => EpicsValue::Short(*v as i16),
ScalarValue::Boolean(v) => EpicsValue::Long(if *v { 1 } else { 0 }),
ScalarValue::String(s) => EpicsValue::String(s.clone()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pvfield_scalar_to_epics_double() {
let f = PvField::Scalar(ScalarValue::Double(2.5));
assert_eq!(pvfield_to_epics_value(&f), Some(EpicsValue::Double(2.5)));
}
#[test]
fn pvfield_struct_with_value_extracts() {
use epics_pva_rs::pvdata::PvStructure;
let mut s = PvStructure::new("epics:nt/NTScalar:1.0");
s.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Long(42))));
let f = PvField::Structure(s);
assert_eq!(pvfield_to_epics_value(&f), Some(EpicsValue::Int64(42)));
}
#[test]
fn pvfield_array_conversions_cover_pvxs_shapes() {
use epics_pva_rs::pvdata::TypedScalarArray;
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArray(vec![
ScalarValue::Float(1.5),
ScalarValue::Float(-2.5),
])),
Some(EpicsValue::FloatArray(vec![1.5, -2.5]))
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArray(vec![
ScalarValue::Short(-7),
ScalarValue::Short(8),
])),
Some(EpicsValue::ShortArray(vec![-7, 8]))
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArray(vec![
ScalarValue::UByte(0x55),
ScalarValue::UByte(0xFF),
])),
Some(EpicsValue::CharArray(vec![0x55, 0xFF]))
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArray(vec![
ScalarValue::Byte(-1),
ScalarValue::Byte(2),
])),
Some(EpicsValue::ShortArray(vec![-1, 2]))
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArray(vec![
ScalarValue::String("a".into()),
ScalarValue::String("b".into()),
])),
Some(EpicsValue::StringArray(vec!["a".into(), "b".into()]))
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArrayTyped(TypedScalarArray::Float(
vec![3.25f32, -4.5].into()
))),
Some(EpicsValue::FloatArray(vec![3.25, -4.5]))
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArrayTyped(TypedScalarArray::String(
vec!["x".to_string(), "y".to_string()].into()
))),
Some(EpicsValue::StringArray(vec!["x".into(), "y".into()]))
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArrayTyped(TypedScalarArray::UByte(
vec![1u8, 2, 3].into()
))),
Some(EpicsValue::CharArray(vec![1, 2, 3]))
);
}
#[test]
fn ex_r8_inp_long_ulong_preserve_full_width() {
use epics_pva_rs::pvdata::TypedScalarArray;
let big_u: u64 = 0x1234_5678_9ABC_DEF0;
let big_i: i64 = -0x0123_4567_89AB_CDEF;
assert_eq!(
pvfield_to_epics_value(&PvField::Scalar(ScalarValue::ULong(big_u))),
Some(EpicsValue::UInt64(big_u)),
"remote ulong scalar must keep all 64 bits"
);
assert_eq!(
pvfield_to_epics_value(&PvField::Scalar(ScalarValue::Long(big_i))),
Some(EpicsValue::Int64(big_i)),
"remote long scalar must keep all 64 bits"
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArray(vec![
ScalarValue::ULong(big_u),
ScalarValue::ULong(1),
])),
Some(EpicsValue::UInt64Array(vec![big_u, 1])),
"remote ulong[] must keep all 64 bits per element"
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArray(vec![
ScalarValue::Long(big_i),
ScalarValue::Long(-1),
])),
Some(EpicsValue::Int64Array(vec![big_i, -1])),
"remote long[] must keep all 64 bits per element"
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArrayTyped(TypedScalarArray::ULong(
vec![big_u, 2].into()
))),
Some(EpicsValue::UInt64Array(vec![big_u, 2])),
"typed remote ulong[] must keep all 64 bits per element"
);
assert_eq!(
pvfield_to_epics_value(&PvField::ScalarArrayTyped(TypedScalarArray::Long(
vec![big_i, -2].into()
))),
Some(EpicsValue::Int64Array(vec![big_i, -2])),
"typed remote long[] must keep all 64 bits per element"
);
}
use crate::pvalink::config::SevrMode;
use epics_pva_rs::pvdata::PvStructure;
fn nt_scalar(v: f64) -> PvField {
let mut s = PvStructure::new("epics:nt/NTScalar:1.0");
s.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(v))));
PvField::Structure(s)
}
struct CountingRecord {
count: Arc<std::sync::atomic::AtomicU32>,
}
impl epics_base_rs::server::record::Record for CountingRecord {
fn record_type(&self) -> &'static str {
"ai"
}
fn process(
&mut self,
) -> epics_base_rs::error::CaResult<epics_base_rs::server::record::ProcessOutcome> {
self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(epics_base_rs::server::record::ProcessOutcome::complete())
}
fn get_field(&self, _name: &str) -> Option<EpicsValue> {
Some(EpicsValue::Double(0.0))
}
fn put_field(
&mut self,
_name: &str,
_value: EpicsValue,
) -> epics_base_rs::error::CaResult<()> {
Ok(())
}
fn field_list(&self) -> &'static [epics_base_rs::server::record::FieldDesc] {
&[]
}
}
#[tokio::test]
async fn b3_forwarder_processes_owning_record_on_update() {
let db = PvDatabase::new();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
db.add_record(
"DEST",
Box::new(CountingRecord {
count: count.clone(),
}),
)
.await
.unwrap();
let mut fanout = ScanFanout::default();
fanout.records.push(ScanTarget {
record: "DEST".to_string(),
always: true,
monorder: 0,
atomic: false,
passive_only: false,
field: String::new(),
});
let scan_targets: ScanTargetMap =
Arc::new(parking_lot::RwLock::new(std::collections::HashMap::from([
("SRC".to_string(), fanout),
])));
let db_slot = Arc::new(parking_lot::RwLock::new(Some(db)));
let (tx, rx) = tokio::sync::mpsc::channel::<PvField>(8);
let forwarder = tokio::spawn(run_notify_forwarder(
"SRC".to_string(),
rx,
scan_targets,
db_slot,
));
tx.send(nt_scalar(1.0)).await.unwrap();
tx.send(nt_scalar(2.0)).await.unwrap();
drop(tx); forwarder.await.unwrap();
assert_eq!(count.load(std::sync::atomic::Ordering::SeqCst), 2);
}
#[tokio::test]
async fn b3_forwarder_skips_unchanged_value_when_not_always() {
let db = PvDatabase::new();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
db.add_record(
"DEST",
Box::new(CountingRecord {
count: count.clone(),
}),
)
.await
.unwrap();
let mut fanout = ScanFanout::default();
fanout.records.push(ScanTarget {
record: "DEST".to_string(),
always: false,
monorder: 0,
atomic: false,
passive_only: false,
field: String::new(),
});
let scan_targets: ScanTargetMap =
Arc::new(parking_lot::RwLock::new(std::collections::HashMap::from([
("SRC".to_string(), fanout),
])));
let db_slot = Arc::new(parking_lot::RwLock::new(Some(db)));
let (tx, rx) = tokio::sync::mpsc::channel::<PvField>(8);
let forwarder = tokio::spawn(run_notify_forwarder(
"SRC".to_string(),
rx,
scan_targets,
db_slot,
));
tx.send(nt_scalar(1.0)).await.unwrap();
tx.send(nt_scalar(1.0)).await.unwrap();
tx.send(nt_scalar(3.0)).await.unwrap();
drop(tx);
forwarder.await.unwrap();
assert_eq!(count.load(std::sync::atomic::Ordering::SeqCst), 2);
}
#[tokio::test]
async fn b3_forwarder_propagates_flnk_chain() {
let db = PvDatabase::new();
let dest_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let down_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
db.add_record(
"DEST",
Box::new(CountingRecord {
count: dest_count.clone(),
}),
)
.await
.unwrap();
db.add_record(
"DOWNSTREAM",
Box::new(CountingRecord {
count: down_count.clone(),
}),
)
.await
.unwrap();
{
let rec = db.get_record("DEST").await.expect("DEST exists");
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("DOWNSTREAM".into()))
.expect("set FLNK");
}
let mut fanout = ScanFanout::default();
fanout.records.push(ScanTarget {
record: "DEST".to_string(),
always: true,
monorder: 0,
atomic: false,
passive_only: false,
field: String::new(),
});
let scan_targets: ScanTargetMap =
Arc::new(parking_lot::RwLock::new(std::collections::HashMap::from([
("SRC".to_string(), fanout),
])));
let db_slot = Arc::new(parking_lot::RwLock::new(Some(db)));
let (tx, rx) = tokio::sync::mpsc::channel::<PvField>(8);
let forwarder = tokio::spawn(run_notify_forwarder(
"SRC".to_string(),
rx,
scan_targets,
db_slot,
));
tx.send(nt_scalar(5.0)).await.unwrap();
drop(tx);
forwarder.await.unwrap();
assert_eq!(dest_count.load(std::sync::atomic::Ordering::SeqCst), 1);
assert_eq!(
down_count.load(std::sync::atomic::Ordering::SeqCst),
1,
"FLNK target must process via process_record_with_links"
);
}
#[tokio::test]
async fn b3_open_link_for_record_registers_scan_target() {
let resolver = PvaLinkResolver::new(tokio::runtime::Handle::current());
let _ = resolver
.open_link_for_record("pva://SRC:PV?proc=CP&sevr=MS", "MY:REC")
.await;
let targets = resolver.scan_targets.read();
let fanout = targets.get("SRC:PV").expect("scan target registered");
assert_eq!(fanout.records.len(), 1);
assert_eq!(fanout.records[0].record, "MY:REC");
drop(targets);
let opts = resolver.link_options.read();
let cfg = opts
.get("SRC:PV?proc=CP&sevr=MS")
.expect("link options retained");
assert_eq!(cfg.sevr, SevrMode::Ms);
assert!(cfg.scan_on_update);
}
#[tokio::test]
async fn b3_non_cp_link_registers_no_scan_target() {
let resolver = PvaLinkResolver::new(tokio::runtime::Handle::current());
let _ = resolver
.open_link_for_record("pva://OTHER:PV?proc=NPP", "REC2")
.await;
assert!(resolver.scan_targets.read().get("OTHER:PV").is_none());
}
#[tokio::test]
async fn b2_open_link_retains_sevr_mode() {
let resolver = PvaLinkResolver::new(tokio::runtime::Handle::current());
let _ = resolver.open_link("pva://A:PV?sevr=MSI").await;
let cfg = resolver.inp_cfg_for("A:PV?sevr=MSI").clone();
assert_eq!(cfg.sevr, SevrMode::Msi);
assert_eq!(resolver.inp_cfg_for("UNSEEN").sevr, SevrMode::Nms);
}
#[test]
fn extract_leaf_walks_dotted_path() {
let mut alarm = PvStructure::new("alarm_t");
alarm
.fields
.push(("severity".into(), PvField::Scalar(ScalarValue::Int(2))));
let mut root = PvStructure::new("epics:nt/NTScalar:1.0");
root.fields
.push(("alarm".into(), PvField::Structure(alarm)));
let leaf = extract_leaf(&PvField::Structure(root), "alarm.severity");
assert!(matches!(leaf, PvField::Scalar(ScalarValue::Int(2))));
}
struct OrderRecord {
name: &'static str,
log: Arc<parking_lot::Mutex<Vec<&'static str>>>,
}
impl epics_base_rs::server::record::Record for OrderRecord {
fn record_type(&self) -> &'static str {
"ai"
}
fn process(
&mut self,
) -> epics_base_rs::error::CaResult<epics_base_rs::server::record::ProcessOutcome> {
self.log.lock().push(self.name);
Ok(epics_base_rs::server::record::ProcessOutcome::complete())
}
fn get_field(&self, _n: &str) -> Option<EpicsValue> {
Some(EpicsValue::Double(0.0))
}
fn put_field(&mut self, _n: &str, _v: EpicsValue) -> epics_base_rs::error::CaResult<()> {
Ok(())
}
fn field_list(&self) -> &'static [epics_base_rs::server::record::FieldDesc] {
&[]
}
}
#[tokio::test]
async fn b4_forwarder_orders_by_atomic_then_monorder() {
let db = PvDatabase::new();
let log = Arc::new(parking_lot::Mutex::new(Vec::new()));
for name in ["A", "B", "C", "D"] {
db.add_record(
name,
Box::new(OrderRecord {
name,
log: log.clone(),
}),
)
.await
.unwrap();
}
let mut fanout = ScanFanout::default();
fanout.records.push(ScanTarget {
record: "C".into(),
always: true,
monorder: 1,
atomic: false,
passive_only: false,
field: String::new(),
});
fanout.records.push(ScanTarget {
record: "D".into(),
always: true,
monorder: -1,
atomic: false,
passive_only: false,
field: String::new(),
});
fanout.records.push(ScanTarget {
record: "A".into(),
always: true,
monorder: 5,
atomic: true,
passive_only: false,
field: String::new(),
});
fanout.records.push(ScanTarget {
record: "B".into(),
always: true,
monorder: 0,
atomic: true,
passive_only: false,
field: String::new(),
});
let scan_targets: ScanTargetMap =
Arc::new(parking_lot::RwLock::new(std::collections::HashMap::from([
("SRC".to_string(), fanout),
])));
let db_slot = Arc::new(parking_lot::RwLock::new(Some(db)));
let (tx, rx) = tokio::sync::mpsc::channel::<PvField>(4);
let forwarder = tokio::spawn(run_notify_forwarder(
"SRC".to_string(),
rx,
scan_targets,
db_slot,
));
tx.send(nt_scalar(1.0)).await.unwrap();
drop(tx);
forwarder.await.unwrap();
assert_eq!(*log.lock(), vec!["B", "A", "D", "C"]);
}
#[tokio::test]
async fn b4_atomic_scans_even_on_no_op_update() {
let db = PvDatabase::new();
let count = Arc::new(std::sync::atomic::AtomicU32::new(0));
db.add_record(
"DEST",
Box::new(CountingRecord {
count: count.clone(),
}),
)
.await
.unwrap();
let mut fanout = ScanFanout::default();
fanout.records.push(ScanTarget {
record: "DEST".into(),
always: false, monorder: 0,
atomic: true, passive_only: false,
field: String::new(),
});
let scan_targets: ScanTargetMap =
Arc::new(parking_lot::RwLock::new(std::collections::HashMap::from([
("SRC".to_string(), fanout),
])));
let db_slot = Arc::new(parking_lot::RwLock::new(Some(db)));
let (tx, rx) = tokio::sync::mpsc::channel::<PvField>(4);
let forwarder = tokio::spawn(run_notify_forwarder(
"SRC".to_string(),
rx,
scan_targets,
db_slot,
));
tx.send(nt_scalar(1.0)).await.unwrap();
tx.send(nt_scalar(1.0)).await.unwrap();
drop(tx);
forwarder.await.unwrap();
assert_eq!(count.load(std::sync::atomic::Ordering::SeqCst), 2);
}
#[tokio::test]
async fn b4_local_link_rejects_non_local_pv() {
let db = Arc::new(PvDatabase::new());
let resolver = install_pvalink_resolver(&db, tokio::runtime::Handle::current()).await;
let r = resolver
.open_link("pva://NOT:A:LOCAL:RECORD?local=true")
.await;
let rejected = matches!(r, Err(PvaLinkError::NotLocal(_)));
assert!(rejected, "local link to a non-local PV must be rejected");
}
#[tokio::test]
async fn b4_non_local_link_to_remote_pv_is_allowed() {
let db = Arc::new(PvDatabase::new());
let resolver = install_pvalink_resolver(&db, tokio::runtime::Handle::current()).await;
let r = resolver.open_link("pva://SOME:REMOTE:PV").await;
assert!(r.is_ok(), "non-local link should open");
}
#[tokio::test]
async fn b4_local_link_accepts_simple_pv() {
let db = Arc::new(PvDatabase::new());
db.add_pv("LOCAL:SIMPLE:PV", EpicsValue::Double(3.0))
.await
.unwrap();
let resolver = install_pvalink_resolver(&db, tokio::runtime::Handle::current()).await;
let r = resolver.open_link("pva://LOCAL:SIMPLE:PV?local=true").await;
assert!(
r.is_ok(),
"local link to a simple add_pv PV must be accepted"
);
}
#[cfg(feature = "qsrv")]
#[tokio::test]
async fn b4_local_link_accepts_qsrv_group_pv() {
use crate::qsrv::BridgeProvider;
use epics_base_rs::server::records::ai::AiRecord;
let db = Arc::new(PvDatabase::new());
db.add_record("GRP:level", Box::new(AiRecord::new(1.0)))
.await
.unwrap();
db.add_record("GRP:count", Box::new(AiRecord::new(2.0)))
.await
.unwrap();
const GROUP_JSON: &str = r#"{
"LOCAL:GROUP": {
"+id": "epics:nt/NTGroup:1.0",
"level": { "+channel": "GRP:level.VAL", "+type": "plain" },
"count": { "+channel": "GRP:count.VAL", "+type": "plain" }
}
}"#;
let provider = Arc::new(BridgeProvider::new(db.clone()));
provider.load_group_config(GROUP_JSON).expect("load group");
provider.process_groups();
assert!(
provider.has_group_pv("LOCAL:GROUP"),
"group PV must be registered in the provider"
);
let resolver = install_pvalink_resolver(&db, tokio::runtime::Handle::current()).await;
resolver.attach_qsrv_provider(provider);
let r = resolver.open_link("pva://LOCAL:GROUP?local=true").await;
assert!(
r.is_ok(),
"local link to a QSRV group composite PV must be accepted, got err {:?}",
r.err()
);
let remote = resolver
.open_link("pva://OFF:SITE:REMOTE:PV?local=true")
.await;
assert!(
matches!(remote, Err(PvaLinkError::NotLocal(_))),
"local link to a remote-only PV must still be rejected, got err {:?}",
remote.err()
);
}
#[tokio::test]
async fn br_r10_db_json_pvalink_options_preserved() {
use epics_base_rs::server::record::{ParsedLink, parse_link_v2};
let json =
r#"{pva: {pv: "TARGET:AI", field: "display.precision", proc: "CPP", sevr: "MS"}}"#;
let stored = match parse_link_v2(json) {
ParsedLink::Pva(s) => s,
other => panic!("expected Pva, got {other:?}"),
};
assert!(
stored.contains("field=display.precision"),
"field option must survive parse: {stored}"
);
assert!(stored.contains("proc=CPP"), "proc must survive: {stored}");
assert!(stored.contains("sevr=MS"), "sevr must survive: {stored}");
let cfg = PvaLinkConfig::parse(&format!("pva://{stored}"), LinkDirection::Inp).unwrap();
assert_eq!(cfg.pv_name, "TARGET:AI");
assert_eq!(cfg.field, "display.precision");
assert!(cfg.scan_on_update, "CPP → scan_on_update");
assert!(cfg.scan_on_passive, "CPP → scan_on_passive");
assert_eq!(cfg.sevr, SevrMode::Ms);
let resolver = PvaLinkResolver::new(tokio::runtime::Handle::current());
let _ = resolver
.open_link_for_record(&format!("pva://{stored}"), "MY:RECORD")
.await;
let cfg = resolver.inp_cfg_for(&stored);
assert_eq!(
cfg.field, "display.precision",
"field option must be registered (was 'value' before fix)"
);
assert_eq!(cfg.sevr, SevrMode::Ms, "sevr option must be registered");
assert!(cfg.scan_on_update, "CPP scan_on_update must be registered");
assert!(
cfg.scan_on_passive,
"CPP scan_on_passive must be registered"
);
let targets = resolver.scan_targets.read();
let fanout = targets
.get("TARGET:AI")
.expect("CPP target must be registered");
assert_eq!(fanout.records[0].record, "MY:RECORD");
assert!(fanout.records[0].passive_only, "CPP must set passive_only");
assert_eq!(
fanout.records[0].field, "display.precision",
"ScanTarget.field must reflect the per-link field selector"
);
}
#[tokio::test]
async fn br_r27_pvalink_cache_separates_per_link_options() {
let resolver = PvaLinkResolver::new(tokio::runtime::Handle::current());
let link_a = "pva://TARGET:PV?field=alarm.severity&proc=CPP";
let _ = resolver.open_link_for_record(link_a, "RECORD:A").await;
let link_b = "pva://TARGET:PV?field=value&proc=CP";
let _ = resolver.open_link_for_record(link_b, "RECORD:B").await;
let cfg_a = resolver.inp_cfg_for("TARGET:PV?field=alarm.severity&proc=CPP");
let cfg_b = resolver.inp_cfg_for("TARGET:PV?field=value&proc=CP");
assert_eq!(
cfg_a.field, "alarm.severity",
"link A field must not be overwritten by link B"
);
assert_eq!(
cfg_b.field, "value",
"link B field must retain its own value"
);
assert!(
cfg_a.scan_on_passive,
"link A CPP must set scan_on_passive; link B's CP must not clobber it"
);
assert!(
!cfg_b.scan_on_passive,
"link B CP must not be passive-only; link A must not propagate"
);
let targets = resolver.scan_targets.read();
let fanout = targets
.get("TARGET:PV")
.expect("scan targets registered for TARGET:PV");
let rec_a = fanout
.records
.iter()
.find(|t| t.record == "RECORD:A")
.expect("RECORD:A must be in scan targets");
let rec_b = fanout
.records
.iter()
.find(|t| t.record == "RECORD:B")
.expect("RECORD:B must be in scan targets");
assert_eq!(
rec_a.field, "alarm.severity",
"RECORD:A ScanTarget.field wrong"
);
assert_eq!(rec_b.field, "value", "RECORD:B ScanTarget.field wrong");
assert!(rec_a.passive_only, "RECORD:A must be CPP (passive_only)");
assert!(
!rec_b.passive_only,
"RECORD:B must be CP (not passive_only)"
);
}
#[cfg(feature = "qsrv")]
#[tokio::test]
async fn b4_local_gate_without_qsrv_still_rejects_remote() {
let db = Arc::new(PvDatabase::new());
let resolver = install_pvalink_resolver(&db, tokio::runtime::Handle::current()).await;
let r = resolver
.open_link("pva://NO:QSRV:REMOTE:PV?local=true")
.await;
assert!(
matches!(r, Err(PvaLinkError::NotLocal(_))),
"without a QSRV handle a non-local link must still be rejected"
);
}
struct SlowLoggingRecord {
name: &'static str,
log: Arc<parking_lot::Mutex<Vec<String>>>,
epoch_entered: Option<Arc<tokio::sync::Notify>>,
}
impl epics_base_rs::server::record::Record for SlowLoggingRecord {
fn record_type(&self) -> &'static str {
"ai"
}
fn process(
&mut self,
) -> epics_base_rs::error::CaResult<epics_base_rs::server::record::ProcessOutcome> {
self.log.lock().push(self.name.to_string());
if let Some(n) = &self.epoch_entered {
n.notify_one();
}
std::thread::sleep(std::time::Duration::from_millis(40));
Ok(epics_base_rs::server::record::ProcessOutcome::complete())
}
fn get_field(&self, _name: &str) -> Option<EpicsValue> {
Some(EpicsValue::Double(0.0))
}
fn put_field(
&mut self,
_name: &str,
_value: EpicsValue,
) -> epics_base_rs::error::CaResult<()> {
Ok(())
}
fn field_list(&self) -> &'static [epics_base_rs::server::record::FieldDesc] {
&[]
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn br_r18_atomic_scan_holds_multi_record_lock_epoch() {
let db = PvDatabase::new();
let log = Arc::new(parking_lot::Mutex::new(Vec::new()));
let epoch_entered = Arc::new(tokio::sync::Notify::new());
for name in ["AT:A", "AT:B"] {
db.add_record(
name,
Box::new(SlowLoggingRecord {
name,
log: log.clone(),
epoch_entered: (name == "AT:A").then(|| epoch_entered.clone()),
}),
)
.await
.unwrap();
}
let mut fanout = ScanFanout::default();
fanout.records.push(ScanTarget {
record: "AT:A".into(),
always: true,
monorder: 0,
atomic: true,
passive_only: false,
field: "value".into(),
});
fanout.records.push(ScanTarget {
record: "AT:B".into(),
always: true,
monorder: 1,
atomic: true,
passive_only: false,
field: "value".into(),
});
let scan_targets: ScanTargetMap =
Arc::new(parking_lot::RwLock::new(std::collections::HashMap::from([
("SRC".to_string(), fanout),
])));
let db_slot = Arc::new(parking_lot::RwLock::new(Some(db.clone())));
let (tx, rx) = tokio::sync::mpsc::channel::<PvField>(4);
let forwarder = tokio::spawn(run_notify_forwarder(
"SRC".to_string(),
rx,
scan_targets,
db_slot,
));
let competitor_log = log.clone();
let competitor_db = db.clone();
let competitor = tokio::spawn(async move {
epoch_entered.notified().await;
let _epoch = competitor_db.lock_records(&["AT:B".to_string()]).await;
competitor_log.lock().push("EXTERNAL".to_string());
});
tx.send(nt_scalar(1.0)).await.unwrap();
drop(tx);
forwarder.await.unwrap();
competitor.await.unwrap();
assert_eq!(
*log.lock(),
vec!["AT:A", "AT:B", "EXTERNAL"],
"external writer must not interleave between atomic scan targets"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mr_r15_getters_use_caller_options_not_shared_link() {
use crate::pvalink::link::PvaLink;
use epics_pva_rs::pvdata::PvField;
let mut alarm = PvStructure::new("alarm_t");
alarm
.fields
.push(("severity".into(), PvField::Scalar(ScalarValue::Int(2))));
alarm.fields.push((
"message".into(),
PvField::Scalar(ScalarValue::String("HIGH".into())),
));
let mut ts = PvStructure::new("time_t");
ts.fields.push((
"secondsPastEpoch".into(),
PvField::Scalar(ScalarValue::Long(1_700_000_000)),
));
ts.fields
.push(("nanoseconds".into(), PvField::Scalar(ScalarValue::Int(42))));
let mut root = PvStructure::new("epics:nt/NTScalar:1.0");
root.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(3.0))));
root.fields
.push(("alarm".into(), PvField::Structure(alarm)));
root.fields
.push(("timeStamp".into(), PvField::Structure(ts)));
let cached = PvField::Structure(root);
let resolver = PvaLinkResolver::new(tokio::runtime::Handle::current());
let shared_cfg = default_inp_cfg("MR_R15:PV");
let shared_link = std::sync::Arc::new(PvaLink::for_test(shared_cfg.clone(), Some(cached)));
resolver.registry.insert_for_test(&shared_cfg, shared_link);
let nms_name = "pva://MR_R15:PV";
assert_eq!(
LinkSet::alarm_severity(&resolver, nms_name),
None,
"an NMS caller must not propagate the remote alarm"
);
let ms_name = "pva://MR_R15:PV?sevr=MS";
assert_eq!(
LinkSet::alarm_severity(&resolver, ms_name),
Some(2),
"an MS caller must see MAJOR even though the cached link is NMS"
);
assert_eq!(
LinkSet::alarm_message(&resolver, ms_name),
Some("HIGH".to_string()),
"an MS caller must get the remote alarm message"
);
assert_eq!(
resolver.link_alarm_severity(ms_name),
Some(2),
"resolver-level link_alarm_severity must use the caller's MS mode"
);
assert_eq!(
LinkSet::time_stamp(&resolver, nms_name),
None,
"a time=false caller must not adopt the upstream timestamp"
);
assert_eq!(
LinkSet::time_stamp(&resolver, "pva://MR_R15:PV?time=true"),
Some((1_700_000_000, 42)),
"a time=true caller must adopt the upstream timestamp"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn mr_r23_out_uint64_array_uses_typed_path() {
use epics_pva_rs::pvdata::ScalarType;
out_array_typed_path_case(
"MR_R23:PV",
ScalarType::ULong,
EpicsValue::UInt64Array(vec![1, 2, u64::MAX]),
&[1, 2, u64::MAX as i64],
)
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn ex_r10_out_int64_array_uses_typed_path() {
use epics_pva_rs::pvdata::ScalarType;
out_array_typed_path_case(
"EX_R10:PV",
ScalarType::Long,
EpicsValue::Int64Array(vec![-3, 0, i64::MAX]),
&[-3, 0, i64::MAX],
)
.await;
}
async fn out_array_typed_path_case(
pv_name: &str,
elem_type: epics_pva_rs::pvdata::ScalarType,
value: EpicsValue,
expected: &[i64],
) {
use crate::pvalink::link::PvaLink;
use epics_pva_rs::client::PvaClient;
use epics_pva_rs::pvdata::{FieldDesc, PvField, ScalarValue};
use epics_pva_rs::server_native::{PvaServer, SharedPV, SharedSource};
let desc = FieldDesc::Structure {
struct_id: "epics:nt/NTScalarArray:1.0".into(),
fields: vec![("value".into(), FieldDesc::ScalarArray(elem_type))],
};
let initial = PvField::Structure(epics_pva_rs::pvdata::PvStructure {
struct_id: "epics:nt/NTScalarArray:1.0".into(),
fields: vec![("value".into(), PvField::ScalarArray(vec![]))],
});
let pv = SharedPV::new();
pv.open(desc, initial);
let source = SharedSource::new();
source.add(pv_name, pv.clone());
let server =
PvaServer::isolated(std::sync::Arc::new(source)).expect("test PVA server starts");
let addr = server.tcp_addr();
let resolver = PvaLinkResolver::new(tokio::runtime::Handle::current());
let out_cfg = resolver.out_cfg_for(pv_name);
let client = PvaClient::builder()
.server_addr(addr)
.timeout(std::time::Duration::from_secs(3))
.build();
let link = std::sync::Arc::new(PvaLink::for_test_with_client(out_cfg.clone(), client));
resolver.registry.insert_for_test(&out_cfg, link);
let scheme_name = format!("pva://{pv_name}");
LinkSet::put_value(&resolver, &scheme_name, value)
.expect("typed array OUT write must succeed");
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
let current = pv.current().expect("PV has a current value");
let PvField::Structure(s) = current else {
panic!("expected structure value");
};
let value_field = s.get_field("value").expect("value sub-field present");
let elem_as_i64 = |sv: &ScalarValue| -> i64 {
match sv {
ScalarValue::Long(x) => *x,
ScalarValue::ULong(x) => *x as i64,
other => panic!("expected a 64-bit element, got {other:?}"),
}
};
let got: Vec<i64> = match value_field {
PvField::ScalarArray(v) => v.iter().map(elem_as_i64).collect(),
PvField::ScalarArrayTyped(t) => t.to_scalar_values().iter().map(elem_as_i64).collect(),
other => panic!("expected an array value field, got {other:?}"),
};
assert_eq!(
got, expected,
"typed-array OUT write must land the full-width 64-bit array value"
);
}
}