use std::sync::Arc;
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::types::{DbFieldType, EpicsValue};
use epics_pva_rs::pvdata::{FieldDesc, PvField, PvStructure, ScalarValue};
use super::monitor::BridgeMonitor;
use super::provider::Channel;
use super::pvif::{
self, NtType, build_field_desc_for_nt, pv_structure_to_epics, snapshot_to_pv_structure,
};
use crate::convert::{dbf_to_scalar_type, scalar_to_epics_typed};
use crate::error::{BridgeError, BridgeResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessMode {
Passive,
Force,
Inhibit,
}
#[derive(Debug, Clone)]
pub struct PutOptions {
pub process: ProcessMode,
pub block: bool,
}
impl Default for PutOptions {
fn default() -> Self {
Self {
process: ProcessMode::Passive,
block: false,
}
}
}
pub fn dbe_mask_from_pv_request(request: &PvStructure) -> Option<u16> {
use epics_base_rs::server::recgbl::EventMask;
let options = request
.get_field("record")
.and_then(|f| match f {
PvField::Structure(s) => s.get_field("_options"),
_ => None,
})
.and_then(|f| match f {
PvField::Structure(s) => Some(s),
_ => None,
})?;
let dbe = options.get_field("DBE")?;
let raw = match dbe {
PvField::Scalar(ScalarValue::String(s)) => s.clone(),
PvField::Scalar(ScalarValue::Int(n)) => return Some((*n as u32 & 0xFFFF) as u16),
PvField::Scalar(ScalarValue::Long(n)) => return Some((*n as u32 & 0xFFFF) as u16),
_ => return None,
};
if let Ok(n) = raw.trim().parse::<u32>() {
return Some((n & 0xFFFF) as u16);
}
let mut mask = EventMask::NONE;
for tok in raw.split(|c: char| c == '|' || c == ',' || c.is_whitespace()) {
let t = tok.trim().to_ascii_uppercase();
let t = t.strip_prefix("DBE_").unwrap_or(&t);
match t {
"" => continue,
"VALUE" => mask |= EventMask::VALUE,
"ALARM" => mask |= EventMask::ALARM,
"LOG" | "ARCHIVE" => mask |= EventMask::LOG,
"PROPERTY" => mask |= EventMask::PROPERTY,
_ => return None,
}
}
if mask.is_empty() {
None
} else {
Some(mask.bits())
}
}
pub fn atomic_from_pv_request(request: &PvStructure) -> Option<bool> {
let options = request
.get_field("record")
.and_then(|f| match f {
PvField::Structure(s) => s.get_field("_options"),
_ => None,
})
.and_then(|f| match f {
PvField::Structure(s) => Some(s),
_ => None,
})?;
match options.get_field("atomic")? {
PvField::Scalar(ScalarValue::Boolean(b)) => Some(*b),
PvField::Scalar(ScalarValue::String(s)) => match s.to_ascii_lowercase().as_str() {
"true" => Some(true),
"false" => Some(false),
_ => None,
},
_ => None,
}
}
impl PutOptions {
pub fn from_pv_request(request: &PvStructure) -> Self {
let mut opts = Self::default();
let options = request
.get_field("record")
.and_then(|f| match f {
PvField::Structure(s) => s.get_field("_options"),
_ => None,
})
.and_then(|f| match f {
PvField::Structure(s) => Some(s),
_ => None,
});
if let Some(opt_struct) = options {
if let Some(PvField::Scalar(ScalarValue::String(s))) = opt_struct.get_field("process") {
opts.process = match s.as_str() {
"true" => ProcessMode::Force,
"false" => ProcessMode::Inhibit,
_ => ProcessMode::Passive,
};
}
if let Some(PvField::Scalar(ScalarValue::Boolean(b))) = opt_struct.get_field("block") {
opts.block = *b;
if opts.process == ProcessMode::Inhibit {
opts.block = false;
}
}
}
opts
}
}
pub struct BridgeChannel {
db: Arc<PvDatabase>,
pv_name: String,
record_name: String,
field: String,
nt_type: NtType,
value_dbf: DbFieldType,
monitor_filters: std::sync::Arc<epics_base_rs::server::database::filters::FilterChain>,
access: super::provider::AccessContext,
}
impl BridgeChannel {
fn nt_type_for_field(record_type: &str, field: &str) -> NtType {
if field.eq_ignore_ascii_case("VAL") {
NtType::from_record_type(record_type)
} else {
NtType::Scalar
}
}
pub fn from_cached(
db: Arc<PvDatabase>,
pv_name: String,
record_name: String,
field: String,
nt_type: NtType,
value_dbf: DbFieldType,
) -> Self {
Self {
db,
pv_name,
record_name,
field,
nt_type,
value_dbf,
monitor_filters: std::sync::Arc::new(
epics_base_rs::server::database::filters::FilterChain::new(),
),
access: super::provider::AccessContext::allow_all(),
}
}
pub fn with_access(mut self, access: super::provider::AccessContext) -> Self {
self.access = access;
self
}
pub async fn new(db: Arc<PvDatabase>, name: &str) -> BridgeResult<Self> {
let parsed = epics_base_rs::server::database::filters::split_channel_name(name);
let monitor_filters = match parsed.json_suffix.as_deref() {
Some(json) => std::sync::Arc::new(
epics_base_rs::server::database::filters::parse_filter_chain(json),
),
None => {
std::sync::Arc::new(epics_base_rs::server::database::filters::FilterChain::new())
}
};
let resolution_name = parsed.record_path.as_str();
let (record_name, field) = epics_base_rs::server::database::parse_pv_name(resolution_name);
let field_upper = field.to_ascii_uppercase();
let rec = db
.get_record(record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(record_name.to_string()))?;
let instance = rec.read().await;
let rtyp = instance.record.record_type();
let nt_type = Self::nt_type_for_field(rtyp, &field_upper);
let value_dbf = instance
.record
.field_list()
.iter()
.find(|f| f.name == field_upper)
.map(|f| f.dbf_type)
.unwrap_or(DbFieldType::Double);
Ok(Self {
db,
pv_name: name.to_string(),
record_name: record_name.to_string(),
field: field_upper,
nt_type,
value_dbf,
monitor_filters,
access: super::provider::AccessContext::allow_all(),
})
}
pub fn nt_type(&self) -> NtType {
self.nt_type
}
pub fn value_dbf(&self) -> DbFieldType {
self.value_dbf
}
pub fn record_name(&self) -> &str {
&self.record_name
}
pub fn field(&self) -> &str {
&self.field
}
pub async fn put_with_options(
&self,
value: &PvStructure,
opts: PutOptions,
) -> BridgeResult<()> {
if !self.access.can_write(&self.pv_name) {
return Err(BridgeError::PutRejected(format!(
"write denied for {} (user='{}' host='{}')",
self.pv_name, self.access.user, self.access.host
)));
}
let raw_val = pv_structure_to_epics(value).ok_or_else(|| BridgeError::TypeMismatch {
expected: "extractable value".into(),
got: value.struct_id.to_string(),
})?;
let epics_val = match &raw_val {
EpicsValue::Double(_)
| EpicsValue::Float(_)
| EpicsValue::Short(_)
| EpicsValue::Long(_)
| EpicsValue::Int64(_)
| EpicsValue::UInt64(_)
| EpicsValue::Char(_)
| EpicsValue::Enum(_)
| EpicsValue::String(_) => {
let sv = crate::convert::epics_to_scalar(&raw_val);
scalar_to_epics_typed(&sv, self.value_dbf)
}
_ => raw_val,
};
match opts.process {
ProcessMode::Inhibit => {
self.db
.put_pv(&format!("{}.{}", self.record_name, self.field), epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
}
ProcessMode::Passive => {
let notify_rx = self
.db
.put_record_field_from_ca(&self.record_name, &self.field, epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
if opts.block
&& let Some(rx) = notify_rx
{
let _ = rx.await;
}
}
ProcessMode::Force => {
self.db
.put_pv(&format!("{}.{}", self.record_name, self.field), epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
self.db
.process_record(&self.record_name)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
}
}
Ok(())
}
}
impl Channel for BridgeChannel {
fn channel_name(&self) -> &str {
&self.pv_name
}
async fn get(&self, request: &PvStructure) -> BridgeResult<PvStructure> {
if !self.access.can_read(&self.pv_name) {
return Err(BridgeError::PutRejected(format!(
"read denied for {} (user='{}' host='{}')",
self.pv_name, self.access.user, self.access.host
)));
}
let rec = self
.db
.get_record(&self.record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(self.record_name.clone()))?;
let instance = rec.read().await;
let snapshot =
instance
.snapshot_for_field(&self.field)
.ok_or_else(|| BridgeError::FieldNotFound {
record: self.record_name.clone(),
field: self.field.clone(),
})?;
let full = snapshot_to_pv_structure(&snapshot, self.nt_type);
Ok(pvif::filter_by_request(&full, request))
}
async fn put(&self, value: &PvStructure) -> BridgeResult<()> {
let opts = PutOptions::from_pv_request(value);
self.put_with_options(value, opts).await
}
async fn get_field(&self) -> BridgeResult<FieldDesc> {
let scalar_type = dbf_to_scalar_type(self.value_dbf);
Ok(build_field_desc_for_nt(self.nt_type, scalar_type))
}
async fn create_monitor(&self) -> BridgeResult<super::group::AnyMonitor> {
self.create_monitor_with_value_mask(None).await
}
}
impl BridgeChannel {
pub async fn create_monitor_with_value_mask(
&self,
value_mask: Option<u16>,
) -> BridgeResult<super::group::AnyMonitor> {
if !self.access.can_read(&self.pv_name) {
return Err(BridgeError::PutRejected(format!(
"monitor create denied for {} (user='{}' host='{}')",
self.pv_name, self.access.user, self.access.host
)));
}
let mut monitor = BridgeMonitor::new(
self.db.clone(),
self.record_name.clone(),
self.field.clone(),
self.nt_type,
)
.with_access(self.access.clone())
.with_filters(self.monitor_filters.clone());
if let Some(mask) = value_mask {
monitor = monitor.with_value_mask(mask);
}
Ok(super::group::AnyMonitor::Single(Box::new(monitor)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn put_options_default() {
let opts = PutOptions::default();
assert_eq!(opts.process, ProcessMode::Passive);
assert!(!opts.block);
}
#[test]
fn put_options_from_empty_request() {
let req = PvStructure::new("empty");
let opts = PutOptions::from_pv_request(&req);
assert_eq!(opts.process, ProcessMode::Passive);
assert!(!opts.block);
}
#[test]
fn put_options_process_true() {
let mut options = PvStructure::new("");
options.fields.push((
"process".into(),
PvField::Scalar(ScalarValue::String("true".into())),
));
options
.fields
.push(("block".into(), PvField::Scalar(ScalarValue::Boolean(true))));
let mut record = PvStructure::new("");
record
.fields
.push(("_options".into(), PvField::Structure(options)));
let mut req = PvStructure::new("request");
req.fields
.push(("record".into(), PvField::Structure(record)));
let opts = PutOptions::from_pv_request(&req);
assert_eq!(opts.process, ProcessMode::Force);
assert!(opts.block);
}
#[test]
fn put_options_inhibit_disables_block() {
let mut options = PvStructure::new("");
options.fields.push((
"process".into(),
PvField::Scalar(ScalarValue::String("false".into())),
));
options
.fields
.push(("block".into(), PvField::Scalar(ScalarValue::Boolean(true))));
let mut record = PvStructure::new("");
record
.fields
.push(("_options".into(), PvField::Structure(options)));
let mut req = PvStructure::new("request");
req.fields
.push(("record".into(), PvField::Structure(record)));
let opts = PutOptions::from_pv_request(&req);
assert_eq!(opts.process, ProcessMode::Inhibit);
assert!(!opts.block); }
fn req_with_dbe(value: PvField) -> PvStructure {
let mut options = PvStructure::new("");
options.fields.push(("DBE".into(), value));
let mut record = PvStructure::new("");
record
.fields
.push(("_options".into(), PvField::Structure(options)));
let mut req = PvStructure::new("request");
req.fields
.push(("record".into(), PvField::Structure(record)));
req
}
#[test]
fn dbe_mask_parses_value_alarm() {
use epics_base_rs::server::recgbl::EventMask;
let req = req_with_dbe(PvField::Scalar(ScalarValue::String("VALUE | ALARM".into())));
let mask = dbe_mask_from_pv_request(&req).expect("must parse");
assert_eq!(mask, (EventMask::VALUE | EventMask::ALARM).bits());
}
#[test]
fn dbe_mask_accepts_dbe_prefix_and_archive_alias() {
use epics_base_rs::server::recgbl::EventMask;
let req = req_with_dbe(PvField::Scalar(ScalarValue::String(
"DBE_VALUE,DBE_ARCHIVE,PROPERTY".into(),
)));
let mask = dbe_mask_from_pv_request(&req).expect("must parse");
assert_eq!(
mask,
(EventMask::VALUE | EventMask::LOG | EventMask::PROPERTY).bits()
);
}
#[test]
fn dbe_mask_accepts_integer_form() {
let req = req_with_dbe(PvField::Scalar(ScalarValue::Int(5)));
let mask = dbe_mask_from_pv_request(&req).expect("must parse");
assert_eq!(mask, 5);
}
#[test]
fn dbe_mask_absent_returns_none() {
let req = PvStructure::new("request");
assert!(dbe_mask_from_pv_request(&req).is_none());
}
fn req_with_atomic(value: PvField) -> PvStructure {
let mut options = PvStructure::new("");
options.fields.push(("atomic".into(), value));
let mut record = PvStructure::new("");
record
.fields
.push(("_options".into(), PvField::Structure(options)));
let mut req = PvStructure::new("request");
req.fields
.push(("record".into(), PvField::Structure(record)));
req
}
#[test]
fn atomic_option_parses_boolean_true() {
let req = req_with_atomic(PvField::Scalar(ScalarValue::Boolean(true)));
assert_eq!(atomic_from_pv_request(&req), Some(true));
}
#[test]
fn atomic_option_parses_string_false() {
let req = req_with_atomic(PvField::Scalar(ScalarValue::String("false".into())));
assert_eq!(atomic_from_pv_request(&req), Some(false));
}
#[test]
fn atomic_option_absent_returns_none() {
let req = PvStructure::new("request");
assert!(atomic_from_pv_request(&req).is_none());
}
}