use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use epics_pva_rs::client::PvaClient;
use epics_pva_rs::pv_request::PvRequestExpr;
use epics_pva_rs::pvdata::{PvField, PvStructure, ScalarValue};
use super::config::{LinkDirection, PvaLinkConfig};
#[derive(Debug, thiserror::Error)]
pub enum PvaLinkError {
#[error("PVA error: {0}")]
Pva(#[from] epics_pva_rs::error::PvaError),
#[error("link is INP-only, write requested")]
NotWritable,
#[error("link is OUT-only, read requested")]
NotReadable,
#[error("field {0:?} not found in remote NT structure")]
FieldNotFound(String),
#[error("field {0:?} is not a scalar")]
NotScalar(String),
#[error("link config parse error: {0}")]
Config(#[from] super::config::PvaLinkParseError),
#[error("retry queue full ({0} pending puts)")]
RetryQueueFull(usize),
#[error("local-only link {0:?} has no matching local record")]
NotLocal(String),
}
pub type PvaLinkResult<T> = Result<T, PvaLinkError>;
pub struct PvaLink {
_monitor_abort: Option<MonitorAbort>,
config: PvaLinkConfig,
client: PvaClient,
latest: Arc<Mutex<Option<PvField>>>,
monitor_connected: Option<Arc<AtomicBool>>,
notify_rx: Mutex<Option<mpsc::Receiver<PvField>>>,
put_queue: Mutex<Vec<QueuedPut>>,
}
#[derive(Debug, Clone)]
enum QueuedPut {
Str(String),
Field(PvField),
}
const MAX_PUT_QUEUE: usize = 1024;
struct MonitorAbort(tokio::task::AbortHandle);
impl Drop for MonitorAbort {
fn drop(&mut self) {
self.0.abort();
}
}
impl PvaLink {
pub async fn open(config: PvaLinkConfig) -> PvaLinkResult<Self> {
let client = PvaClient::builder().timeout(Duration::from_secs(5)).build();
let latest = Arc::new(Mutex::new(None));
let mut notify_rx = None;
let mut monitor_abort = None;
let mut monitor_connected = None;
if matches!(config.direction, LinkDirection::Inp) && config.monitor {
let (tx, rx) = mpsc::channel::<PvField>(config.queue_size.max(1));
notify_rx = Some(rx);
let pv_name = config.pv_name.clone();
let latest_clone = latest.clone();
let client_clone = client.clone();
let connected = Arc::new(AtomicBool::new(false));
let connected_for_task = connected.clone();
monitor_connected = Some(connected);
let request = monitor_request(&config);
let join = tokio::spawn(async move {
let mut backoff = Duration::from_millis(250);
let max_backoff = Duration::from_secs(30);
loop {
let tx_inner = tx.clone();
let latest_inner = latest_clone.clone();
let connected_inner = connected_for_task.clone();
let on_event = move |value: &PvField| {
connected_inner.store(true, Ordering::Release);
*latest_inner.lock() = Some(value.clone());
let _ = tx_inner.try_send(value.clone());
};
let result = match &request {
Some(req) => {
client_clone
.pvmonitor_with_request(&pv_name, req, on_event)
.await
}
None => client_clone.pvmonitor(&pv_name, on_event).await,
};
connected_for_task.store(false, Ordering::Release);
match &result {
Ok(()) => tracing::debug!(
pv = %pv_name,
"pvalink: INP monitor ended, re-subscribing"
),
Err(e) => tracing::warn!(
pv = %pv_name,
error = %e,
backoff_ms = backoff.as_millis() as u64,
"pvalink: INP monitor failed, will retry"
),
}
tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, max_backoff);
}
});
monitor_abort = Some(MonitorAbort(join.abort_handle()));
}
Ok(Self {
config,
client,
latest,
monitor_connected,
notify_rx: Mutex::new(notify_rx),
put_queue: Mutex::new(Vec::new()),
_monitor_abort: monitor_abort,
})
}
pub fn take_notify_rx(&self) -> Option<mpsc::Receiver<PvField>> {
self.notify_rx.lock().take()
}
pub fn config(&self) -> &PvaLinkConfig {
&self.config
}
pub async fn read(&self) -> PvaLinkResult<PvField> {
self.read_with_field(&self.config.field.clone()).await
}
pub async fn read_with_field(&self, field: &str) -> PvaLinkResult<PvField> {
if matches!(self.config.direction, LinkDirection::Out) {
return Err(PvaLinkError::NotReadable);
}
if self.config.monitor
&& let Some(v) = self.latest.lock().clone()
{
return Ok(extract_field(&v, field));
}
let result = self.client.pvget_full(&self.config.pv_name).await?;
Ok(extract_field(&result.value, field))
}
pub fn try_read_cached(&self) -> Option<PvField> {
self.try_read_cached_with_field(&self.config.field.clone())
}
pub fn try_read_cached_with_field(&self, field: &str) -> Option<PvField> {
if matches!(self.config.direction, LinkDirection::Out) || !self.config.monitor {
return None;
}
let v = self.latest.lock().clone()?;
Some(extract_field(&v, field))
}
pub async fn read_scalar_f64(&self) -> PvaLinkResult<f64> {
let pv = self.read().await?;
scalar_as_f64(&pv).ok_or_else(|| PvaLinkError::NotScalar(self.config.field.clone()))
}
pub async fn write(&self, value_str: &str) -> PvaLinkResult<()> {
self.write_with_block(value_str, false).await
}
pub async fn write_with_block(&self, value_str: &str, block: bool) -> PvaLinkResult<()> {
if matches!(self.config.direction, LinkDirection::Inp) {
return Err(PvaLinkError::NotWritable);
}
if self.config.defer {
return self.enqueue_put(QueuedPut::Str(value_str.to_string()));
}
let req = build_put_request(self.config.process, block);
let result = if is_subfield(&self.config.field) {
self.client
.pvput_field_with_request(&self.config.pv_name, &self.config.field, &req, value_str)
.await
} else {
self.client
.pvput_with_request(&self.config.pv_name, &req, value_str)
.await
};
match result {
Ok(()) => Ok(()),
Err(e) if self.config.retry && is_disconnect(&e) => {
self.enqueue_put(QueuedPut::Str(value_str.to_string()))
}
Err(e) => Err(PvaLinkError::Pva(e)),
}
}
pub async fn write_pv_field(&self, value: &PvField) -> PvaLinkResult<()> {
self.write_pv_field_with_block(value, false).await
}
pub async fn write_pv_field_with_block(
&self,
value: &PvField,
block: bool,
) -> PvaLinkResult<()> {
if matches!(self.config.direction, LinkDirection::Inp) {
return Err(PvaLinkError::NotWritable);
}
if self.config.defer {
return self.enqueue_put(QueuedPut::Field(value.clone()));
}
let req = build_put_request(self.config.process, block);
let result = if is_subfield(&self.config.field) {
self.client
.pvput_pv_field_field_with_request(
&self.config.pv_name,
&self.config.field,
&req,
value,
)
.await
} else {
self.client
.pvput_pv_field_with_request(&self.config.pv_name, &req, value)
.await
};
match result {
Ok(()) => Ok(()),
Err(e) if self.config.retry && is_disconnect(&e) => {
self.enqueue_put(QueuedPut::Field(value.clone()))
}
Err(e) => Err(PvaLinkError::Pva(e)),
}
}
fn enqueue_put(&self, value: QueuedPut) -> PvaLinkResult<()> {
let mut q = self.put_queue.lock();
if q.len() >= MAX_PUT_QUEUE {
return Err(PvaLinkError::RetryQueueFull(q.len()));
}
q.push(value);
Ok(())
}
pub fn pending_put_count(&self) -> usize {
self.put_queue.lock().len()
}
pub async fn flush_deferred(&self) -> PvaLinkResult<usize> {
if matches!(self.config.direction, LinkDirection::Inp) {
return Err(PvaLinkError::NotWritable);
}
let queued: Vec<QueuedPut> = std::mem::take(&mut *self.put_queue.lock());
let mut sent = 0usize;
for (idx, value) in queued.iter().enumerate() {
let req = build_put_request(self.config.process, false);
let put_result = match value {
QueuedPut::Str(s) => {
if is_subfield(&self.config.field) {
self.client
.pvput_field_with_request(
&self.config.pv_name,
&self.config.field,
&req,
s,
)
.await
} else {
self.client
.pvput_with_request(&self.config.pv_name, &req, s)
.await
}
}
QueuedPut::Field(f) => {
if is_subfield(&self.config.field) {
self.client
.pvput_pv_field_field_with_request(
&self.config.pv_name,
&self.config.field,
&req,
f,
)
.await
} else {
self.client
.pvput_pv_field_with_request(&self.config.pv_name, &req, f)
.await
}
}
};
match put_result {
Ok(()) => sent += 1,
Err(e) if self.config.retry && is_disconnect(&e) => {
let mut q = self.put_queue.lock();
let mut tail: Vec<QueuedPut> = queued[idx..].to_vec();
tail.append(&mut q);
*q = tail;
return Err(PvaLinkError::Pva(e));
}
Err(e) => {
if idx + 1 < queued.len() {
let mut q = self.put_queue.lock();
let mut tail: Vec<QueuedPut> = queued[idx + 1..].to_vec();
tail.append(&mut q);
*q = tail;
}
return Err(PvaLinkError::Pva(e));
}
}
}
Ok(sent)
}
pub fn is_connected(&self) -> bool {
match &self.monitor_connected {
Some(flag) => flag.load(Ordering::Acquire),
None => self.latest.lock().is_some(),
}
}
fn remote_alarm_severity(&self) -> Option<i32> {
let v = self.latest.lock().clone()?;
let PvField::Structure(s) = v else {
return None;
};
let PvField::Structure(a) = s.get_field("alarm")? else {
return None;
};
match a.get_field("severity")? {
PvField::Scalar(sv) => Some(scalar_value_to_f64(sv) as i32),
_ => None,
}
}
pub fn link_alarm_severity(&self) -> Option<i32> {
self.link_alarm_severity_with(self.config.sevr)
}
pub fn link_alarm_severity_with(&self, sevr: super::config::SevrMode) -> Option<i32> {
let sev = self.remote_alarm_severity()?;
if sevr.propagates(sev) {
Some(sev)
} else {
None
}
}
pub fn alarm_message(&self) -> Option<String> {
self.alarm_message_with(self.config.sevr)
}
pub fn alarm_message_with(&self, sevr: super::config::SevrMode) -> Option<String> {
let sev = self.link_alarm_severity_with(sevr)?;
let v = self.latest.lock().clone()?;
let PvField::Structure(s) = v else {
return None;
};
let msg = s.get_field("alarm").and_then(|alarm| {
let PvField::Structure(a) = alarm else {
return None;
};
match a.get_field("message") {
Some(PvField::Scalar(ScalarValue::String(m))) if !m.is_empty() => Some(m.clone()),
_ => None,
}
});
Some(msg.unwrap_or_else(|| format!("remote severity {sev}")))
}
pub fn latest_value(&self) -> Option<PvField> {
self.latest.lock().clone()
}
pub fn time_stamp(&self) -> Option<(i64, i32)> {
let v = self.latest.lock().clone()?;
let PvField::Structure(s) = v else {
return None;
};
let ts = s.get_field("timeStamp")?;
let PvField::Structure(t) = ts else {
return None;
};
let secs = match t.get_field("secondsPastEpoch")? {
PvField::Scalar(ScalarValue::Long(v)) => *v,
PvField::Scalar(ScalarValue::ULong(v)) => *v as i64,
_ => return None,
};
let nsec = match t.get_field("nanoseconds")? {
PvField::Scalar(ScalarValue::Int(v)) => *v,
PvField::Scalar(ScalarValue::UInt(v)) => *v as i32,
_ => return None,
};
Some((secs, nsec))
}
pub fn link_metadata(&self) -> Option<epics_base_rs::server::database::LinkMetadata> {
self.link_metadata_with(&self.config.field)
}
pub fn link_metadata_with(
&self,
field: &str,
) -> Option<epics_base_rs::server::database::LinkMetadata> {
use epics_base_rs::server::database::LinkMetadata;
let root = self.latest.lock().clone()?;
let value_field = extract_field(&root, field);
let dbf_type = link_dbf_type(&value_field);
let element_count = link_element_count(&value_field);
let graphic_limits = limit_pair(&root, "display.limitLow", "display.limitHigh");
let control_limits = limit_pair(&root, "control.limitLow", "control.limitHigh");
let alarm_limits = {
let lolo = scalar_as_f64(&extract_field(&root, "valueAlarm.lowAlarmLimit"));
let lo = scalar_as_f64(&extract_field(&root, "valueAlarm.lowWarningLimit"));
let hi = scalar_as_f64(&extract_field(&root, "valueAlarm.highWarningLimit"));
let hihi = scalar_as_f64(&extract_field(&root, "valueAlarm.highAlarmLimit"));
match (lolo, lo, hi, hihi) {
(None, None, None, None) => None,
(a, b, c, d) => Some((
a.unwrap_or(0.0),
b.unwrap_or(0.0),
c.unwrap_or(0.0),
d.unwrap_or(0.0),
)),
}
};
let precision = scalar_as_f64(&extract_field(&root, "display.precision")).map(|p| p as i16);
let units = string_field(&root, "display.units");
let description = string_field(&root, "display.description");
Some(LinkMetadata {
dbf_type,
element_count,
graphic_limits,
control_limits,
alarm_limits,
precision,
units,
description,
})
}
#[cfg(test)]
pub(crate) fn for_test(config: PvaLinkConfig, cached: Option<PvField>) -> Self {
let client = PvaClient::builder().timeout(Duration::from_secs(1)).build();
Self {
_monitor_abort: None,
config,
client,
latest: Arc::new(Mutex::new(cached)),
monitor_connected: None,
notify_rx: Mutex::new(None),
put_queue: Mutex::new(Vec::new()),
}
}
#[cfg(test)]
pub(crate) fn for_test_with_client(config: PvaLinkConfig, client: PvaClient) -> Self {
Self {
_monitor_abort: None,
config,
client,
latest: Arc::new(Mutex::new(None)),
monitor_connected: None,
notify_rx: Mutex::new(None),
put_queue: Mutex::new(Vec::new()),
}
}
#[cfg(test)]
pub(crate) fn for_test_with_monitor_flag(
config: PvaLinkConfig,
cached: Option<PvField>,
) -> (Self, Arc<AtomicBool>) {
let client = PvaClient::builder().timeout(Duration::from_secs(1)).build();
let flag = Arc::new(AtomicBool::new(false));
let link = Self {
_monitor_abort: None,
config,
client,
latest: Arc::new(Mutex::new(cached)),
monitor_connected: Some(flag.clone()),
notify_rx: Mutex::new(None),
put_queue: Mutex::new(Vec::new()),
};
(link, flag)
}
}
fn is_disconnect(e: &epics_pva_rs::error::PvaError) -> bool {
use epics_pva_rs::error::PvaError;
match e {
PvaError::Io(_)
| PvaError::Timeout
| PvaError::ChannelNotFound(_)
| PvaError::ConnectionRefused => true,
PvaError::Protocol(msg) => {
let m = msg.to_ascii_lowercase();
m.contains("no servers found")
|| m.contains("not connected")
|| m.contains("disconnect")
}
PvaError::InvalidValue(_) | PvaError::Decode(_) => false,
}
}
fn build_put_request(process: bool, block: bool) -> PvRequestExpr {
PvRequestExpr {
fields: vec![],
record_options: vec![
(
"process".to_string(),
if process { "true" } else { "passive" }.to_string(),
),
(
"block".to_string(),
if block { "true" } else { "false" }.to_string(),
),
],
field_options: vec![],
}
}
fn is_subfield(field: &str) -> bool {
!field.is_empty() && field != "value"
}
fn monitor_request(config: &PvaLinkConfig) -> Option<epics_pva_rs::pv_request::PvRequestExpr> {
let mut req = epics_pva_rs::pv_request::PvRequestExpr::default();
req.record_options.push((
"pipeline".to_string(),
if config.pipeline { "true" } else { "false" }.to_string(),
));
req.record_options
.push(("atomic".to_string(), "true".to_string()));
req.record_options.push((
"queueSize".to_string(),
config.queue_size.max(1).to_string(),
));
Some(req)
}
fn extract_field(root: &PvField, path: &str) -> PvField {
if path.is_empty() {
return root.clone();
}
let mut cursor = root.clone();
for segment in path.split('.') {
cursor = match cursor {
PvField::Structure(s) => s.get_field(segment).cloned().unwrap_or(PvField::Null),
other => return other,
};
}
cursor
}
fn scalar_as_f64(field: &PvField) -> Option<f64> {
match field {
PvField::Scalar(sv) => Some(scalar_value_to_f64(sv)),
PvField::Structure(s) => s.get_value().map(scalar_value_to_f64),
_ => None,
}
}
fn scalar_value_to_f64(v: &ScalarValue) -> f64 {
match v {
ScalarValue::Boolean(b) => {
if *b {
1.0
} else {
0.0
}
}
ScalarValue::Byte(x) => *x as f64,
ScalarValue::UByte(x) => *x as f64,
ScalarValue::Short(x) => *x as f64,
ScalarValue::UShort(x) => *x as f64,
ScalarValue::Int(x) => *x as f64,
ScalarValue::UInt(x) => *x as f64,
ScalarValue::Long(x) => *x as f64,
ScalarValue::ULong(x) => *x as f64,
ScalarValue::Float(x) => *x as f64,
ScalarValue::Double(x) => *x,
ScalarValue::String(s) => s.parse().unwrap_or(0.0),
}
}
fn link_dbf_type(value_field: &PvField) -> Option<epics_base_rs::server::database::LinkDbfType> {
use epics_base_rs::server::database::LinkDbfType;
let from_scalar = |sv: &ScalarValue| match sv {
ScalarValue::Byte(_) => Some(LinkDbfType::Char),
ScalarValue::UByte(_) => Some(LinkDbfType::UChar),
ScalarValue::Short(_) => Some(LinkDbfType::Short),
ScalarValue::UShort(_) => Some(LinkDbfType::UShort),
ScalarValue::Int(_) => Some(LinkDbfType::Long),
ScalarValue::UInt(_) => Some(LinkDbfType::ULong),
ScalarValue::Long(_) => Some(LinkDbfType::Int64),
ScalarValue::ULong(_) => Some(LinkDbfType::UInt64),
ScalarValue::Float(_) => Some(LinkDbfType::Float),
ScalarValue::Double(_) => Some(LinkDbfType::Double),
ScalarValue::String(_) => Some(LinkDbfType::String),
ScalarValue::Boolean(_) => Some(LinkDbfType::Long),
};
match value_field {
PvField::Scalar(sv) => from_scalar(sv),
PvField::ScalarArray(arr) => arr.first().and_then(from_scalar),
PvField::ScalarArrayTyped(arr) => {
use epics_pva_rs::pvdata::ScalarType;
Some(match arr.scalar_type() {
ScalarType::Byte => LinkDbfType::Char,
ScalarType::UByte => LinkDbfType::UChar,
ScalarType::Short => LinkDbfType::Short,
ScalarType::UShort => LinkDbfType::UShort,
ScalarType::Int => LinkDbfType::Long,
ScalarType::UInt => LinkDbfType::ULong,
ScalarType::Long => LinkDbfType::Int64,
ScalarType::ULong => LinkDbfType::UInt64,
ScalarType::Float => LinkDbfType::Float,
ScalarType::Double => LinkDbfType::Double,
ScalarType::String => LinkDbfType::String,
ScalarType::Boolean => LinkDbfType::Long,
})
}
PvField::Structure(s) => {
let has_index = matches!(
s.get_field("index"),
Some(PvField::Scalar(
ScalarValue::Byte(_)
| ScalarValue::UByte(_)
| ScalarValue::Short(_)
| ScalarValue::UShort(_)
| ScalarValue::Int(_)
| ScalarValue::UInt(_)
| ScalarValue::Long(_)
| ScalarValue::ULong(_)
))
);
let has_choices = matches!(
s.get_field("choices"),
Some(PvField::ScalarArray(_) | PvField::ScalarArrayTyped(_))
);
if has_index && has_choices {
Some(LinkDbfType::Enum)
} else {
s.get_field("value").and_then(link_dbf_type)
}
}
_ => None,
}
}
fn link_element_count(value_field: &PvField) -> Option<i64> {
match value_field {
PvField::Scalar(_) => Some(1),
PvField::ScalarArray(arr) => Some(arr.len() as i64),
PvField::ScalarArrayTyped(arr) => Some(arr.len() as i64),
PvField::Structure(s) => {
match s.get_field("value") {
Some(v) => link_element_count(v),
None if s.get_field("index").is_some() => Some(1),
None => None,
}
}
_ => None,
}
}
fn limit_pair(root: &PvField, lo_path: &str, hi_path: &str) -> Option<(f64, f64)> {
let lo = scalar_as_f64(&extract_field(root, lo_path));
let hi = scalar_as_f64(&extract_field(root, hi_path));
match (lo, hi) {
(None, None) => None,
(l, h) => Some((l.unwrap_or(0.0), h.unwrap_or(0.0))),
}
}
fn string_field(root: &PvField, path: &str) -> Option<String> {
match extract_field(root, path) {
PvField::Scalar(ScalarValue::String(s)) if !s.is_empty() => Some(s),
_ => None,
}
}
#[allow(dead_code)]
fn _suppress(_: &PvStructure) {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_top_level_value() {
let mut s = PvStructure::new("epics:nt/NTScalar:1.0");
s.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(1.5))));
let root = PvField::Structure(s);
let v = extract_field(&root, "value");
match v {
PvField::Scalar(ScalarValue::Double(d)) => assert_eq!(d, 1.5),
other => panic!("got {other:?}"),
}
}
#[test]
fn extract_nested_field() {
let mut alarm = PvStructure::new("alarm_t");
alarm
.fields
.push(("severity".into(), PvField::Scalar(ScalarValue::Int(2))));
let mut root = PvStructure::new("epics:nt/NTScalar:1.0");
root.fields
.push(("alarm".into(), PvField::Structure(alarm)));
let value = extract_field(&PvField::Structure(root), "alarm.severity");
assert!(matches!(value, PvField::Scalar(ScalarValue::Int(2))));
}
#[test]
fn missing_field_returns_null() {
let s = PvStructure::new("epics:nt/NTScalar:1.0");
let v = extract_field(&PvField::Structure(s), "nope");
assert!(matches!(v, PvField::Null));
}
use super::super::config::LinkDirection;
use super::super::config::{PvaLinkConfig, SevrMode};
fn nt_with_alarm(severity: i32, message: Option<&str>) -> PvField {
let mut alarm = PvStructure::new("alarm_t");
alarm.fields.push((
"severity".into(),
PvField::Scalar(ScalarValue::Int(severity)),
));
if let Some(m) = message {
alarm.fields.push((
"message".into(),
PvField::Scalar(ScalarValue::String(m.to_string())),
));
}
let mut root = PvStructure::new("epics:nt/NTScalar:1.0");
root.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(7.0))));
root.fields
.push(("alarm".into(), PvField::Structure(alarm)));
PvField::Structure(root)
}
fn inp_cfg(sevr: SevrMode) -> PvaLinkConfig {
PvaLinkConfig {
monitor: true,
sevr,
..PvaLinkConfig::defaults_for("X", LinkDirection::Inp)
}
}
#[test]
fn b2_nms_drops_all_severities() {
for sev in 1..=3 {
let link = PvaLink::for_test(
inp_cfg(SevrMode::Nms),
Some(nt_with_alarm(sev, Some("bad"))),
);
assert_eq!(link.link_alarm_severity(), None, "sev={sev}");
assert_eq!(link.alarm_message(), None, "sev={sev}");
}
}
#[test]
fn b2_ms_propagates_any_nonzero_severity() {
let ok = PvaLink::for_test(inp_cfg(SevrMode::Ms), Some(nt_with_alarm(0, None)));
assert_eq!(ok.link_alarm_severity(), None);
assert_eq!(ok.alarm_message(), None);
for sev in 1..=3 {
let link = PvaLink::for_test(
inp_cfg(SevrMode::Ms),
Some(nt_with_alarm(sev, Some("oops"))),
);
assert_eq!(link.link_alarm_severity(), Some(sev), "sev={sev}");
assert_eq!(link.alarm_message(), Some("oops".to_string()), "sev={sev}");
}
}
#[test]
fn b2_msi_propagates_only_invalid() {
let minor = PvaLink::for_test(inp_cfg(SevrMode::Msi), Some(nt_with_alarm(1, Some("m"))));
assert_eq!(minor.link_alarm_severity(), None);
let major = PvaLink::for_test(inp_cfg(SevrMode::Msi), Some(nt_with_alarm(2, Some("m"))));
assert_eq!(major.link_alarm_severity(), None);
let invalid =
PvaLink::for_test(inp_cfg(SevrMode::Msi), Some(nt_with_alarm(3, Some("dead"))));
assert_eq!(invalid.link_alarm_severity(), Some(3));
assert_eq!(invalid.alarm_message(), Some("dead".to_string()));
}
#[test]
fn b2_synthetic_message_when_no_alarm_message_field() {
let link = PvaLink::for_test(inp_cfg(SevrMode::Ms), Some(nt_with_alarm(2, None)));
assert_eq!(link.link_alarm_severity(), Some(2));
assert_eq!(link.alarm_message(), Some("remote severity 2".to_string()));
}
#[test]
fn b2_no_cached_value_means_no_alarm() {
let link = PvaLink::for_test(inp_cfg(SevrMode::Ms), None);
assert_eq!(link.link_alarm_severity(), None);
assert_eq!(link.alarm_message(), None);
}
#[test]
fn b4_monitor_request_always_carries_pvxs_options() {
let cfg = PvaLinkConfig::defaults_for("X", LinkDirection::Inp);
let req = monitor_request(&cfg).expect("BR-R43: defaults still yield a request");
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "pipeline" && v == "false")
);
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "atomic" && v == "true"),
"BR-R43: atomic must be hard-coded true on remote pvalink monitor requests"
);
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "queueSize" && v == "4"),
"BR-R43: queueSize must default to pvxs's 4 on no-options links"
);
}
#[test]
fn b4_monitor_request_carries_queue_size() {
let cfg = PvaLinkConfig {
queue_size: 16,
..PvaLinkConfig::defaults_for("X", LinkDirection::Inp)
};
let req = monitor_request(&cfg).expect("non-default Q yields a request");
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "queueSize" && v == "16")
);
}
#[test]
fn b4_monitor_request_carries_pipeline() {
let cfg = PvaLinkConfig {
pipeline: true,
..PvaLinkConfig::defaults_for("X", LinkDirection::Inp)
};
let req = monitor_request(&cfg).expect("pipeline yields a request");
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "pipeline" && v == "true")
);
assert!(req.record_options.iter().any(|(k, _)| k == "queueSize"));
}
fn out_cfg(defer: bool, retry: bool) -> PvaLinkConfig {
PvaLinkConfig {
defer,
retry,
..PvaLinkConfig::defaults_for("X", LinkDirection::Out)
}
}
#[tokio::test]
async fn b4_defer_queues_instead_of_putting() {
let link = PvaLink::for_test(out_cfg(true, false), None);
assert_eq!(link.pending_put_count(), 0);
link.write("42").await.expect("deferred write is Ok");
assert_eq!(link.pending_put_count(), 1);
link.write_pv_field(&PvField::Scalar(ScalarValue::Double(1.0)))
.await
.expect("deferred typed write is Ok");
assert_eq!(link.pending_put_count(), 2);
}
#[tokio::test]
async fn minor_deferred_string_put_keeps_string_form() {
let link = PvaLink::for_test(out_cfg(true, false), None);
link.write("42").await.unwrap();
link.write_pv_field(&PvField::Scalar(ScalarValue::Double(1.0)))
.await
.unwrap();
let q = link.put_queue.lock();
assert_eq!(q.len(), 2);
match &q[0] {
QueuedPut::Str(s) => assert_eq!(s, "42"),
other => panic!("string write must queue QueuedPut::Str, got {other:?}"),
}
match &q[1] {
QueuedPut::Field(PvField::Scalar(ScalarValue::Double(d))) => assert_eq!(*d, 1.0),
other => panic!("typed write must queue QueuedPut::Field, got {other:?}"),
}
}
#[tokio::test]
async fn b4_retry_queues_on_disconnect() {
let link = PvaLink::for_test(out_cfg(false, true), None);
let r = link.write("7").await;
assert!(r.is_ok(), "retry write should queue, got {r:?}");
assert_eq!(link.pending_put_count(), 1);
}
#[tokio::test]
async fn b4_no_retry_surfaces_disconnect_error() {
let link = PvaLink::for_test(out_cfg(false, false), None);
let r = link.write("7").await;
assert!(r.is_err(), "non-retry write must error on disconnect");
assert_eq!(link.pending_put_count(), 0);
}
#[tokio::test]
async fn b4_retry_queue_full_rejects() {
let link = PvaLink::for_test(out_cfg(true, false), None);
for _ in 0..MAX_PUT_QUEUE {
link.write("1").await.expect("within capacity");
}
assert_eq!(link.pending_put_count(), MAX_PUT_QUEUE);
let overflow = link.write("1").await;
assert!(matches!(overflow, Err(PvaLinkError::RetryQueueFull(_))));
}
#[tokio::test]
async fn b4_flush_deferred_replays_when_still_disconnected() {
let link = PvaLink::for_test(out_cfg(true, false), None);
link.write("1").await.unwrap();
link.write("2").await.unwrap();
assert_eq!(link.pending_put_count(), 2);
let r = link.flush_deferred().await;
assert!(r.is_err());
assert_eq!(link.pending_put_count(), 1);
}
#[tokio::test]
async fn b4_flush_deferred_retry_restores_unsent_tail() {
let link = PvaLink::for_test(out_cfg(true, true), None);
link.write("1").await.unwrap();
link.write("2").await.unwrap();
let r = link.flush_deferred().await;
assert!(r.is_err(), "still disconnected");
assert_eq!(link.pending_put_count(), 2);
}
#[tokio::test]
async fn b4_flush_on_inp_link_rejected() {
let link = PvaLink::for_test(inp_cfg(SevrMode::Nms), None);
assert!(matches!(
link.flush_deferred().await,
Err(PvaLinkError::NotWritable)
));
}
#[test]
fn b3_take_notify_rx_only_once() {
let link = PvaLink::for_test(inp_cfg(SevrMode::Nms), None);
assert!(link.take_notify_rx().is_none());
}
#[test]
fn bug2_is_connected_reflects_monitor_disconnect() {
let (link, flag) = PvaLink::for_test_with_monitor_flag(
inp_cfg(SevrMode::Nms),
Some(PvField::Scalar(ScalarValue::Double(1.0))),
);
assert!(
!link.is_connected(),
"cached value alone must NOT report connected"
);
flag.store(true, Ordering::Release);
assert!(link.is_connected(), "live subscription reports connected");
flag.store(false, Ordering::Release);
assert!(
link.latest_value().is_some(),
"stale value is still cached after disconnect"
);
assert!(
!link.is_connected(),
"disconnect must be reflected despite the stale cached value"
);
flag.store(true, Ordering::Release);
assert!(link.is_connected(), "re-subscribe restores connected");
}
#[tokio::test]
async fn bug2_inp_monitor_link_installs_connection_flag() {
let cfg = PvaLinkConfig {
monitor: true,
..PvaLinkConfig::defaults_for("BUG2:NOPV", LinkDirection::Inp)
};
let link = PvaLink::open(cfg).await.expect("open INP monitor link");
assert!(
link.monitor_connected.is_some(),
"INP+monitor link must install the live-connection flag"
);
assert!(
!link.is_connected(),
"no event delivered yet → not connected"
);
}
#[test]
fn b4_is_disconnect_classification() {
use epics_pva_rs::error::PvaError;
assert!(is_disconnect(&PvaError::Timeout));
assert!(is_disconnect(&PvaError::ConnectionRefused));
assert!(is_disconnect(&PvaError::ChannelNotFound("x".into())));
assert!(!is_disconnect(&PvaError::InvalidValue("x".into())));
assert!(!is_disconnect(&PvaError::Protocol("x".into())));
assert!(!is_disconnect(&PvaError::Decode("x".into())));
}
fn nt_display(lo: f64, hi: f64, units: &str, desc: &str, prec: i32) -> PvField {
let mut d = PvStructure::new("");
d.fields
.push(("limitLow".into(), PvField::Scalar(ScalarValue::Double(lo))));
d.fields
.push(("limitHigh".into(), PvField::Scalar(ScalarValue::Double(hi))));
d.fields.push((
"units".into(),
PvField::Scalar(ScalarValue::String(units.to_string())),
));
d.fields.push((
"description".into(),
PvField::Scalar(ScalarValue::String(desc.to_string())),
));
d.fields
.push(("precision".into(), PvField::Scalar(ScalarValue::Int(prec))));
PvField::Structure(d)
}
fn nt_control(lo: f64, hi: f64) -> PvField {
let mut c = PvStructure::new("");
c.fields
.push(("limitLow".into(), PvField::Scalar(ScalarValue::Double(lo))));
c.fields
.push(("limitHigh".into(), PvField::Scalar(ScalarValue::Double(hi))));
PvField::Structure(c)
}
fn nt_value_alarm(lolo: f64, lo: f64, hi: f64, hihi: f64) -> PvField {
let mut v = PvStructure::new("");
v.fields.push((
"lowAlarmLimit".into(),
PvField::Scalar(ScalarValue::Double(lolo)),
));
v.fields.push((
"lowWarningLimit".into(),
PvField::Scalar(ScalarValue::Double(lo)),
));
v.fields.push((
"highWarningLimit".into(),
PvField::Scalar(ScalarValue::Double(hi)),
));
v.fields.push((
"highAlarmLimit".into(),
PvField::Scalar(ScalarValue::Double(hihi)),
));
PvField::Structure(v)
}
#[test]
fn br_r24_link_metadata_surfaces_remote_display_control_valuealarm() {
use epics_base_rs::server::database::LinkDbfType;
let mut root = PvStructure::new("epics:nt/NTScalar:1.0");
root.fields
.push(("value".into(), PvField::Scalar(ScalarValue::Double(1.0))));
root.fields
.push(("display".into(), nt_display(-9.0, 9.0, "arb", "linked", 2)));
root.fields
.push(("control".into(), nt_control(-10.0, 10.0)));
root.fields
.push(("valueAlarm".into(), nt_value_alarm(-8.0, -7.0, 7.0, 8.0)));
let link = PvaLink::for_test(inp_cfg(SevrMode::Nms), Some(PvField::Structure(root)));
let meta = link
.link_metadata()
.expect("connected link must expose metadata");
assert_eq!(meta.dbf_type, Some(LinkDbfType::Double), "DBF type");
assert_eq!(meta.element_count, Some(1), "scalar element count");
assert_eq!(meta.graphic_limits, Some((-9.0, 9.0)), "graphic limits");
assert_eq!(meta.control_limits, Some((-10.0, 10.0)), "control limits");
assert_eq!(
meta.alarm_limits,
Some((-8.0, -7.0, 7.0, 8.0)),
"alarm limits (lolo, lo, hi, hihi)"
);
assert_eq!(meta.precision, Some(2), "display precision");
assert_eq!(meta.units.as_deref(), Some("arb"), "display units");
assert_eq!(
meta.description.as_deref(),
Some("linked"),
"display description"
);
}
#[test]
fn br_r24_link_metadata_none_when_disconnected_and_enum_maps_to_dbf_enum() {
use epics_base_rs::server::database::LinkDbfType;
let disconnected = PvaLink::for_test(inp_cfg(SevrMode::Nms), None);
assert!(
disconnected.link_metadata().is_none(),
"no cached value → no metadata snapshot"
);
let mut enum_value = PvStructure::new("enum_t");
enum_value
.fields
.push(("index".into(), PvField::Scalar(ScalarValue::Int(1))));
enum_value.fields.push((
"choices".into(),
PvField::ScalarArray(vec![
ScalarValue::String("OFF".into()),
ScalarValue::String("ON".into()),
]),
));
let mut root = PvStructure::new("epics:nt/NTEnum:1.0");
root.fields
.push(("value".into(), PvField::Structure(enum_value)));
let link = PvaLink::for_test(inp_cfg(SevrMode::Nms), Some(PvField::Structure(root)));
let meta = link.link_metadata().expect("connected");
assert_eq!(meta.dbf_type, Some(LinkDbfType::Enum), "NTEnum → DBF_ENUM");
assert_eq!(meta.element_count, Some(1), "enum index element count");
}
#[test]
fn br_r11_pvalink_out_options_preserved() {
let req = build_put_request(true, false);
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "process" && v == "true"),
"proc=PP must produce process=true in pvRequest"
);
let req = build_put_request(false, false);
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "process" && v == "passive"),
"proc=Default must produce process=passive in pvRequest"
);
let req = build_put_request(true, true);
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "block" && v == "true"),
"block=true must appear in pvRequest"
);
let req = build_put_request(false, false);
assert!(
req.record_options
.iter()
.any(|(k, v)| k == "block" && v == "false"),
"block=false must appear in pvRequest"
);
assert!(!is_subfield(""), "empty field is not a sub-field");
assert!(!is_subfield("value"), "\"value\" is not a sub-field");
assert!(
is_subfield("DESC"),
"\"DESC\" must be treated as a sub-field"
);
assert!(is_subfield("alarm.severity"), "dotted path is a sub-field");
let cfg = PvaLinkConfig {
field: "DESC".to_string(),
process: true,
defer: true,
..PvaLinkConfig::defaults_for("BR11:PV", LinkDirection::Out)
};
let link = PvaLink::for_test(cfg, None);
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(async {
link.write_with_block("hello", true)
.await
.expect("deferred write_with_block must enqueue");
});
assert_eq!(link.pending_put_count(), 1, "one entry queued");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn mr_r4_typed_field_put_targets_subfield() {
use epics_pva_rs::pvdata::{FieldDesc, ScalarType};
use epics_pva_rs::server_native::{PvaServer, SharedPV, SharedSource};
let desc = FieldDesc::Structure {
struct_id: "structure".into(),
fields: vec![
("value".into(), FieldDesc::ScalarArray(ScalarType::Long)),
("aux".into(), FieldDesc::ScalarArray(ScalarType::Long)),
],
};
let initial = PvField::Structure(PvStructure {
struct_id: "structure".into(),
fields: vec![
(
"value".into(),
PvField::ScalarArray(vec![ScalarValue::Long(1), ScalarValue::Long(2)]),
),
(
"aux".into(),
PvField::ScalarArray(vec![ScalarValue::Long(7), ScalarValue::Long(8)]),
),
],
});
let pv = SharedPV::new();
pv.open(desc, initial);
let source = SharedSource::new();
source.add("MR_R4:PV", pv.clone());
let server = PvaServer::isolated(Arc::new(source)).expect("test PVA server must start");
let addr = server.tcp_addr();
let client = PvaClient::builder()
.server_addr(addr)
.timeout(Duration::from_secs(3))
.build();
let cfg = PvaLinkConfig {
field: "aux".to_string(),
..PvaLinkConfig::defaults_for("MR_R4:PV", LinkDirection::Out)
};
let link = PvaLink::for_test_with_client(cfg, client);
link.write_pv_field(&PvField::ScalarArray(vec![
ScalarValue::Long(100),
ScalarValue::Long(200),
ScalarValue::Long(300),
]))
.await
.expect("typed field-targeted write must succeed");
tokio::time::sleep(Duration::from_millis(80)).await;
let current = pv.current().expect("PV has a current value");
let PvField::Structure(s) = current else {
panic!("expected structure value");
};
fn longs(field: &PvField) -> Vec<i64> {
let scalars = match field {
PvField::ScalarArray(v) => v.clone(),
PvField::ScalarArrayTyped(t) => t.to_scalar_values(),
other => panic!("expected an array field, got {other:?}"),
};
scalars
.into_iter()
.map(|sv| match sv {
ScalarValue::Long(x) => x,
other => panic!("expected Long element, got {other:?}"),
})
.collect()
}
let aux = s.get_field("aux").expect("aux sub-field present");
assert_eq!(
longs(aux),
vec![100, 200, 300],
"typed write must update the `aux` sub-field"
);
let value = s.get_field("value").expect("value sub-field present");
assert_eq!(
longs(value),
vec![1, 2],
"root `value` must be untouched by a field-targeted write"
);
}
}