use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum LinkDirection {
Inp,
Out,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum SevrMode {
#[default]
Nms,
Ms,
Msi,
}
impl SevrMode {
pub fn propagates(self, remote_severity: i32) -> bool {
const NO_ALARM: i32 = 0;
const INVALID_ALARM: i32 = 3;
match self {
SevrMode::Nms => false,
SevrMode::Ms => remote_severity != NO_ALARM,
SevrMode::Msi => remote_severity == INVALID_ALARM,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PvaLinkConfig {
pub pv_name: String,
pub field: String,
pub monitor: bool,
pub process: bool,
pub notify: bool,
pub scan_on_update: bool,
pub scan_on_passive: bool,
pub always: bool,
pub sevr: SevrMode,
pub queue_size: usize,
pub pipeline: bool,
pub defer: bool,
pub retry: bool,
pub local: bool,
pub atomic: bool,
pub monorder: i32,
pub time: bool,
pub direction: LinkDirection,
}
pub const DEFAULT_QUEUE_SIZE: usize = 4;
#[derive(Debug, thiserror::Error)]
pub enum PvaLinkParseError {
#[error("not a pva link: {0:?}")]
NotPvaLink(String),
#[error("empty PV name")]
EmptyPv,
#[error("invalid option: {0:?}")]
BadOption(String),
}
impl PvaLinkConfig {
pub fn parse(s: &str, direction: LinkDirection) -> Result<Self, PvaLinkParseError> {
let s = s.trim();
let s = s.strip_prefix('@').unwrap_or(s);
let body = s
.strip_prefix("pva://")
.or_else(|| s.strip_prefix("pva:"))
.ok_or_else(|| PvaLinkParseError::NotPvaLink(s.to_string()))?;
let (body, legacy_mods) = strip_legacy_mods(body);
let (pv_name, opts) = match body.split_once('?') {
Some((pv, q)) => (pv, parse_query(q)?),
None => (body, HashMap::new()),
};
if pv_name.is_empty() {
return Err(PvaLinkParseError::EmptyPv);
}
let mut cfg = PvaLinkConfig {
pv_name: pv_name.to_string(),
direction,
..PvaLinkConfig::defaults_for(pv_name, direction)
};
if let Some(v) = opts.get("field") {
cfg.field = v.clone();
}
if let Some(v) = opts.get("monitor") {
cfg.monitor = parse_bool(v)?;
}
if let Some(v) = opts.get("proc") {
match v.as_str() {
"TRUE" | "true" | "1" | "PP" | "pp" | "PASSIVE" | "passive" => cfg.process = true,
"FALSE" | "false" | "0" | "NPP" | "npp" => cfg.process = false,
"CP" | "cp" => {
cfg.monitor = true;
cfg.scan_on_update = true;
cfg.scan_on_passive = false;
}
"CPP" | "cpp" => {
cfg.monitor = true;
cfg.scan_on_update = true;
cfg.scan_on_passive = true;
}
other => return Err(PvaLinkParseError::BadOption(other.to_string())),
}
}
if let Some(v) = opts.get("notify") {
cfg.notify = parse_bool(v)?;
}
if let Some(v) = opts.get("scan_on_update") {
cfg.scan_on_update = parse_bool(v)?;
}
if let Some(v) = opts.get("sevr") {
cfg.sevr = parse_sevr(v)?;
}
if let Some(v) = opts.get("always") {
cfg.always = parse_bool(v)?;
}
if let Some(v) = opts.get("Q").or_else(|| opts.get("queueSize")) {
let n: i64 = v
.parse()
.map_err(|_| PvaLinkParseError::BadOption(format!("Q={v}")))?;
cfg.queue_size = if n < 1 { 1 } else { n as usize };
}
if let Some(v) = opts.get("pipeline") {
cfg.pipeline = parse_bool(v)?;
}
if let Some(v) = opts.get("defer") {
cfg.defer = parse_bool(v)?;
}
if let Some(v) = opts.get("retry") {
cfg.retry = parse_bool(v)?;
}
if let Some(v) = opts.get("local") {
cfg.local = parse_bool(v)?;
}
if let Some(v) = opts.get("atomic") {
cfg.atomic = parse_bool(v)?;
}
if let Some(v) = opts.get("monorder") {
let n: i64 = v
.parse()
.map_err(|_| PvaLinkParseError::BadOption(format!("monorder={v}")))?;
cfg.monorder = n.clamp(-1024, 1024) as i32;
}
if let Some(v) = opts.get("time") {
cfg.time = parse_bool(v)?;
}
for m in legacy_mods {
match m.as_str() {
"PP" | "pp" => cfg.process = true,
"NPP" | "npp" => cfg.process = false,
"CP" | "cp" => {
cfg.monitor = true;
cfg.scan_on_update = true;
cfg.scan_on_passive = false;
}
"CPP" | "cpp" => {
cfg.monitor = true;
cfg.scan_on_update = true;
cfg.scan_on_passive = true;
}
"MS" | "ms" => cfg.sevr = SevrMode::Ms,
"MSI" | "msi" => cfg.sevr = SevrMode::Msi,
"MSS" | "mss" => cfg.sevr = SevrMode::Ms, "NMS" | "nms" => cfg.sevr = SevrMode::Nms,
_ => {}
}
}
Ok(cfg)
}
pub fn defaults_for(pv_name: &str, direction: LinkDirection) -> Self {
PvaLinkConfig {
pv_name: pv_name.to_string(),
field: "value".to_string(),
monitor: false,
process: false,
notify: false,
scan_on_update: false,
scan_on_passive: false,
always: false,
sevr: SevrMode::Nms,
queue_size: DEFAULT_QUEUE_SIZE,
pipeline: false,
defer: false,
retry: false,
local: false,
atomic: false,
monorder: 0,
time: false,
direction,
}
}
}
fn strip_legacy_mods(body: &str) -> (&str, Vec<String>) {
let mut parts: Vec<&str> = body.split_whitespace().collect();
if parts.len() <= 1 {
return (body, Vec::new());
}
let head = parts.remove(0);
let mods: Vec<String> = parts.into_iter().map(|s| s.to_string()).collect();
(head, mods)
}
fn parse_query(q: &str) -> Result<HashMap<String, String>, PvaLinkParseError> {
let mut out = HashMap::new();
for chunk in q.split('&').filter(|s| !s.is_empty()) {
let (k, v) = chunk
.split_once('=')
.ok_or_else(|| PvaLinkParseError::BadOption(chunk.to_string()))?;
out.insert(k.to_string(), v.to_string());
}
Ok(out)
}
fn parse_sevr(v: &str) -> Result<SevrMode, PvaLinkParseError> {
match v {
"NMS" | "nms" | "false" | "FALSE" | "0" | "no" | "NO" => Ok(SevrMode::Nms),
"MS" | "ms" | "MSS" | "mss" | "true" | "TRUE" | "1" | "yes" | "YES" => Ok(SevrMode::Ms),
"MSI" | "msi" => Ok(SevrMode::Msi),
other => Err(PvaLinkParseError::BadOption(format!("sevr={other}"))),
}
}
fn parse_bool(v: &str) -> Result<bool, PvaLinkParseError> {
match v {
"true" | "TRUE" | "1" | "yes" | "YES" => Ok(true),
"false" | "FALSE" | "0" | "no" | "NO" => Ok(false),
other => Err(PvaLinkParseError::BadOption(other.to_string())),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn bare_pv_name() {
let c = PvaLinkConfig::parse("pva://OTHER:PV", LinkDirection::Inp).unwrap();
assert_eq!(c.pv_name, "OTHER:PV");
assert_eq!(c.field, "value");
assert!(!c.monitor);
assert!(!c.process);
assert!(!c.time);
}
#[test]
fn time_option_parses_true() {
let c = PvaLinkConfig::parse("pva://X?time=true", LinkDirection::Inp).unwrap();
assert!(c.time, "time=true must parse");
let c = PvaLinkConfig::parse("pva://X?time=false", LinkDirection::Inp).unwrap();
assert!(!c.time, "time=false must parse");
}
#[test]
fn cp_vs_cpp_distinguished() {
let cp = PvaLinkConfig::parse("pva://X?proc=CP", LinkDirection::Inp).unwrap();
assert!(cp.scan_on_update, "CP must enable scan_on_update");
assert!(
!cp.scan_on_passive,
"CP must NOT set scan_on_passive (pvxs scanOnUpdateYes)"
);
let cpp = PvaLinkConfig::parse("pva://X?proc=CPP", LinkDirection::Inp).unwrap();
assert!(cpp.scan_on_update, "CPP must enable scan_on_update");
assert!(
cpp.scan_on_passive,
"CPP must set scan_on_passive (pvxs scanOnUpdatePassive)"
);
let bare_cpp = PvaLinkConfig::parse("pva://X CPP", LinkDirection::Inp).unwrap();
assert!(
bare_cpp.scan_on_passive,
"bare CPP must set scan_on_passive"
);
let bare_cp = PvaLinkConfig::parse("pva://X CP", LinkDirection::Inp).unwrap();
assert!(
!bare_cp.scan_on_passive,
"bare CP must not set scan_on_passive"
);
}
#[test]
fn at_prefix_accepted() {
let c = PvaLinkConfig::parse("@pva://X", LinkDirection::Out).unwrap();
assert_eq!(c.pv_name, "X");
}
#[test]
fn query_options() {
let c = PvaLinkConfig::parse(
"pva://A?field=alarm.severity&monitor=true&proc=PASSIVE",
LinkDirection::Inp,
)
.unwrap();
assert_eq!(c.field, "alarm.severity");
assert!(c.monitor);
assert!(c.process);
}
#[test]
fn legacy_pp_modifier() {
let c = PvaLinkConfig::parse("pva://X PP", LinkDirection::Out).unwrap();
assert_eq!(c.pv_name, "X");
assert!(c.process);
}
#[test]
fn empty_pv_rejected() {
assert!(matches!(
PvaLinkConfig::parse("pva://", LinkDirection::Inp),
Err(PvaLinkParseError::EmptyPv)
));
}
#[test]
fn non_pva_rejected() {
assert!(matches!(
PvaLinkConfig::parse("ca://X", LinkDirection::Inp),
Err(PvaLinkParseError::NotPvaLink(_))
));
}
#[test]
fn sevr_defaults_to_nms() {
let c = PvaLinkConfig::parse("pva://X", LinkDirection::Inp).unwrap();
assert_eq!(c.sevr, SevrMode::Nms);
}
#[test]
fn sevr_legacy_modifiers() {
assert_eq!(
PvaLinkConfig::parse("pva://X MS", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Ms
);
assert_eq!(
PvaLinkConfig::parse("pva://X MSI", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Msi
);
assert_eq!(
PvaLinkConfig::parse("pva://X NMS", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Nms
);
assert_eq!(
PvaLinkConfig::parse("pva://X MSS", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Ms
);
}
#[test]
fn sevr_query_option() {
assert_eq!(
PvaLinkConfig::parse("pva://X?sevr=MS", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Ms
);
assert_eq!(
PvaLinkConfig::parse("pva://X?sevr=MSI", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Msi
);
assert_eq!(
PvaLinkConfig::parse("pva://X?sevr=true", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Ms
);
assert_eq!(
PvaLinkConfig::parse("pva://X?sevr=false", LinkDirection::Inp)
.unwrap()
.sevr,
SevrMode::Nms
);
}
#[test]
fn sevr_propagation_semantics() {
for sev in 0..=3 {
assert!(!SevrMode::Nms.propagates(sev));
}
assert!(!SevrMode::Ms.propagates(0)); assert!(SevrMode::Ms.propagates(1)); assert!(SevrMode::Ms.propagates(2)); assert!(SevrMode::Ms.propagates(3)); assert!(!SevrMode::Msi.propagates(0));
assert!(!SevrMode::Msi.propagates(1));
assert!(!SevrMode::Msi.propagates(2));
assert!(SevrMode::Msi.propagates(3));
}
#[test]
fn proc_cp_implies_monitor_and_scan() {
let c = PvaLinkConfig::parse("pva://X?proc=CP", LinkDirection::Inp).unwrap();
assert!(c.monitor);
assert!(c.scan_on_update);
let c2 = PvaLinkConfig::parse("pva://X CPP", LinkDirection::Inp).unwrap();
assert!(c2.monitor);
assert!(c2.scan_on_update);
}
#[test]
fn queue_size_parsing_and_clamp() {
assert_eq!(
PvaLinkConfig::parse("pva://X?Q=8", LinkDirection::Inp)
.unwrap()
.queue_size,
8
);
assert_eq!(
PvaLinkConfig::parse("pva://X?Q=0", LinkDirection::Inp)
.unwrap()
.queue_size,
1
);
assert_eq!(
PvaLinkConfig::parse("pva://X", LinkDirection::Inp)
.unwrap()
.queue_size,
DEFAULT_QUEUE_SIZE
);
assert_eq!(
PvaLinkConfig::parse("pva://X?queueSize=16", LinkDirection::Inp)
.unwrap()
.queue_size,
16
);
}
#[test]
fn monorder_parsing_and_clamp() {
assert_eq!(
PvaLinkConfig::parse("pva://X?monorder=5", LinkDirection::Inp)
.unwrap()
.monorder,
5
);
assert_eq!(
PvaLinkConfig::parse("pva://X?monorder=99999", LinkDirection::Inp)
.unwrap()
.monorder,
1024
);
assert_eq!(
PvaLinkConfig::parse("pva://X?monorder=-99999", LinkDirection::Inp)
.unwrap()
.monorder,
-1024
);
}
#[test]
fn boolean_options_parsed() {
let c = PvaLinkConfig::parse(
"pva://X?pipeline=true&defer=true&retry=true&local=true&atomic=true&always=true",
LinkDirection::Out,
)
.unwrap();
assert!(c.pipeline);
assert!(c.defer);
assert!(c.retry);
assert!(c.local);
assert!(c.atomic);
assert!(c.always);
}
#[test]
fn boolean_options_default_false() {
let c = PvaLinkConfig::parse("pva://X", LinkDirection::Out).unwrap();
assert!(!c.pipeline);
assert!(!c.defer);
assert!(!c.retry);
assert!(!c.local);
assert!(!c.atomic);
assert!(!c.always);
assert_eq!(c.monorder, 0);
}
#[test]
fn bad_option_value_rejected() {
assert!(matches!(
PvaLinkConfig::parse("pva://X?Q=notanumber", LinkDirection::Inp),
Err(PvaLinkParseError::BadOption(_))
));
assert!(matches!(
PvaLinkConfig::parse("pva://X?sevr=BOGUS", LinkDirection::Inp),
Err(PvaLinkParseError::BadOption(_))
));
}
}