use std::sync::Arc;
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::database::db_access::DbSubscription;
use epics_base_rs::types::DbFieldType;
use epics_pva_rs::pvdata::{FieldDesc, PvField, PvStructure, ScalarType};
use super::convert::{dbf_to_scalar_type, epics_to_pv_field};
use super::group_config::{GroupMember, GroupPvDef, TriggerDef};
use super::monitor::BridgeMonitor;
use super::pvif::{self, FieldMapping, NtType};
use crate::error::{BridgeError, BridgeResult};
#[derive(Debug, Clone, PartialEq)]
struct FieldNameComponent {
name: String,
index: Option<u32>,
}
fn parse_field_path(path: &str) -> Vec<FieldNameComponent> {
if path.is_empty() {
return Vec::new();
}
path.split('.')
.filter(|s| !s.is_empty())
.map(|part| {
if let Some(bracket) = part.find('[') {
let name = part[..bracket].to_string();
let rest = &part[bracket + 1..];
let index = rest.strip_suffix(']').and_then(|s| s.parse::<u32>().ok());
FieldNameComponent { name, index }
} else {
FieldNameComponent {
name: part.to_string(),
index: None,
}
}
})
.collect()
}
pub fn get_nested_field<'a>(pv: &'a PvStructure, path: &str) -> Option<&'a PvField> {
let components = parse_field_path(path);
if components.is_empty() {
return None;
}
let mut current_struct = pv;
for (i, comp) in components.iter().enumerate() {
let field = current_struct.get_field(&comp.name)?;
if let Some(idx) = comp.index {
if let PvField::ScalarArray(arr) = field
&& i == components.len() - 1
{
return arr.get(idx as usize).map(|_sv| {
field
});
}
return None;
}
if i == components.len() - 1 {
return Some(field);
}
match field {
PvField::Structure(s) => current_struct = s,
_ => return None,
}
}
None
}
pub fn set_nested_field(pv: &mut PvStructure, path: &str, value: PvField) {
let components = parse_field_path(path);
if components.is_empty() {
return;
}
set_nested_field_recursive(pv, &components, value);
}
fn set_nested_field_recursive(
pv: &mut PvStructure,
components: &[FieldNameComponent],
value: PvField,
) {
if components.is_empty() {
return;
}
let comp = &components[0];
if components.len() == 1 && comp.index.is_none() {
if let Some(pos) = pv.fields.iter().position(|(n, _)| n == &comp.name) {
pv.fields[pos].1 = value;
} else {
pv.fields.push((comp.name.clone(), value));
}
return;
}
let sub = get_or_create_struct_field(pv, &comp.name);
set_nested_field_recursive(sub, &components[1..], value);
}
fn get_or_create_struct_field<'a>(pv: &'a mut PvStructure, name: &str) -> &'a mut PvStructure {
let pos = pv.fields.iter().position(|(n, _)| n == name);
if let Some(pos) = pos {
if !matches!(pv.fields[pos].1, PvField::Structure(_)) {
pv.fields[pos].1 = PvField::Structure(PvStructure::new(""));
}
if let PvField::Structure(ref mut s) = pv.fields[pos].1 {
s
} else {
unreachable!()
}
} else {
pv.fields
.push((name.to_string(), PvField::Structure(PvStructure::new(""))));
if let PvField::Structure(ref mut s) = pv.fields.last_mut().unwrap().1 {
s
} else {
unreachable!()
}
}
}
pub fn set_nested_field_desc(fields: &mut Vec<(String, FieldDesc)>, path: &str, leaf: FieldDesc) {
let components = parse_field_path(path);
if components.is_empty() {
return;
}
set_nested_field_desc_recursive(fields, &components, leaf);
}
fn set_nested_field_desc_recursive(
fields: &mut Vec<(String, FieldDesc)>,
components: &[FieldNameComponent],
leaf: FieldDesc,
) {
if components.is_empty() {
return;
}
let comp = &components[0];
if components.len() == 1 && comp.index.is_none() {
if let Some(pos) = fields.iter().position(|(n, _)| n == &comp.name) {
fields[pos].1 = leaf;
} else {
fields.push((comp.name.clone(), leaf));
}
return;
}
let sub_fields: &mut Vec<(String, FieldDesc)> =
if let Some(pos) = fields.iter().position(|(n, _)| n == &comp.name) {
match &mut fields[pos].1 {
FieldDesc::Structure { fields: f, .. } => f,
other => {
*other = FieldDesc::Structure {
struct_id: String::new(),
fields: Vec::new(),
};
if let FieldDesc::Structure { fields: f, .. } = &mut fields[pos].1 {
f
} else {
unreachable!()
}
}
}
} else {
fields.push((
comp.name.clone(),
FieldDesc::Structure {
struct_id: String::new(),
fields: Vec::new(),
},
));
if let FieldDesc::Structure { fields: f, .. } = &mut fields.last_mut().unwrap().1 {
f
} else {
unreachable!()
}
};
set_nested_field_desc_recursive(sub_fields, &components[1..], leaf);
}
async fn lock_group_records_read(
db: &PvDatabase,
members: &[GroupMember],
) -> Vec<(
String,
tokio::sync::OwnedRwLockReadGuard<epics_base_rs::server::record::RecordInstance>,
)> {
let mut record_names: Vec<String> = members
.iter()
.filter(|m| !m.channel.is_empty())
.map(|m| {
let (rec, _) = epics_base_rs::server::database::parse_pv_name(&m.channel);
rec.to_string()
})
.collect();
record_names.sort();
record_names.dedup();
let mut guards = Vec::new();
for name in &record_names {
if let Some(rec) = db.get_record(name).await {
guards.push((name.clone(), rec.read_owned().await));
}
}
guards
}
pub struct GroupChannel {
db: Arc<PvDatabase>,
def: GroupPvDef,
access: super::provider::AccessContext,
}
impl GroupChannel {
pub fn new(db: Arc<PvDatabase>, def: GroupPvDef) -> Self {
Self {
db,
def,
access: super::provider::AccessContext::allow_all(),
}
}
pub fn with_access(mut self, access: super::provider::AccessContext) -> Self {
self.access = access;
self
}
pub(crate) async fn read_group(&self) -> BridgeResult<PvStructure> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let struct_id = self.def.struct_id.as_deref().unwrap_or("structure");
let mut pv = PvStructure::new(struct_id);
let _guards = if self.def.atomic {
lock_group_records_read(&self.db, &self.def.members).await
} else {
Vec::new()
};
for member in &self.def.members {
if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Structure {
continue;
}
let field = self.read_member(member).await?;
set_nested_field(&mut pv, &member.field_name, field);
}
Ok(pv)
}
#[allow(dead_code)]
async fn read_partial(&self, field_names: &[String]) -> BridgeResult<PvStructure> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let struct_id = self.def.struct_id.as_deref().unwrap_or("structure");
let mut pv = PvStructure::new(struct_id);
for member in &self.def.members {
if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Structure {
continue;
}
if !field_names.contains(&member.field_name) {
continue;
}
let field = self.read_member(member).await?;
set_nested_field(&mut pv, &member.field_name, field);
}
Ok(pv)
}
async fn read_member(&self, member: &GroupMember) -> BridgeResult<PvField> {
if member.mapping == FieldMapping::Const {
return Ok(member
.const_value
.clone()
.unwrap_or(PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Int(0))));
}
if member.mapping == FieldMapping::Structure {
return Ok(PvField::Structure(PvStructure::new("")));
}
if member.mapping == FieldMapping::Proc {
return Ok(PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Int(0)));
}
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
let rec = self
.db
.get_record(record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(record_name.to_string()))?;
let instance = rec.read().await;
match member.mapping {
FieldMapping::Scalar => {
let snapshot = instance.snapshot_for_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
let rtyp = instance.record.record_type();
let nt_type = NtType::from_record_type(rtyp);
Ok(PvField::Structure(pvif::snapshot_to_pv_structure(
&snapshot, nt_type,
)))
}
FieldMapping::Plain => {
let value = instance.resolve_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
Ok(epics_to_pv_field(&value))
}
FieldMapping::Meta => {
let snapshot = instance.snapshot_for_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
let mut meta = PvStructure::new("meta_t");
meta.fields.push((
"alarm".into(),
PvField::Structure(build_alarm_from_snapshot(&snapshot)),
));
meta.fields.push((
"timeStamp".into(),
PvField::Structure(build_timestamp_from_snapshot_masked(
&snapshot,
member.nsec_mask,
)),
));
Ok(PvField::Structure(meta))
}
FieldMapping::Any => {
let value = instance.resolve_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
Ok(epics_to_pv_field(&value))
}
FieldMapping::Proc | FieldMapping::Structure | FieldMapping::Const => unreachable!(),
}
}
async fn introspect_member(&self, member: &GroupMember) -> BridgeResult<(NtType, ScalarType)> {
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
let rec = self
.db
.get_record(record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(record_name.to_string()))?;
let instance = rec.read().await;
let rtyp = instance.record.record_type();
let nt_type = NtType::from_record_type(rtyp);
let field_upper = field_name.to_ascii_uppercase();
let value_dbf = instance
.record
.field_list()
.iter()
.find(|f| f.name == field_upper)
.map(|f| f.dbf_type)
.unwrap_or(DbFieldType::Double);
Ok((nt_type, dbf_to_scalar_type(value_dbf)))
}
async fn member_dbf_type(&self, member: &GroupMember) -> DbFieldType {
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
let rec = match self.db.get_record(record_name).await {
Some(r) => r,
None => return DbFieldType::Double,
};
let instance = rec.read().await;
let field_upper = field_name.to_ascii_uppercase();
instance
.record
.field_list()
.iter()
.find(|f| f.name == field_upper)
.map(|f| f.dbf_type)
.unwrap_or(DbFieldType::Double)
}
async fn convert_member_value(
&self,
member: &GroupMember,
pv_field: &epics_pva_rs::pvdata::PvField,
) -> Option<epics_base_rs::types::EpicsValue> {
use epics_pva_rs::pvdata::PvField;
match pv_field {
PvField::Scalar(sv) => {
let target = self.member_dbf_type(member).await;
Some(super::convert::scalar_to_epics_typed(sv, target))
}
_ => super::convert::pv_field_to_epics(pv_field),
}
}
}
impl super::provider::Channel for GroupChannel {
fn channel_name(&self) -> &str {
&self.def.name
}
async fn get(&self, request: &PvStructure) -> BridgeResult<PvStructure> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let full = self.read_group().await?;
Ok(pvif::filter_by_request(&full, request))
}
async fn put(&self, value: &PvStructure) -> BridgeResult<()> {
if !self.access.can_write(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"write denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let opts = super::channel::PutOptions::from_pv_request(value);
let use_process = opts.process != super::channel::ProcessMode::Inhibit;
let mut ordered: Vec<&GroupMember> = self.def.members.iter().collect();
ordered.sort_by_key(|m| m.put_order);
if self.def.atomic {
let mut writes: Vec<(&GroupMember, Option<epics_base_rs::types::EpicsValue>)> =
Vec::new();
for member in &ordered {
if member.mapping == FieldMapping::Proc {
writes.push((member, None));
continue;
}
if member.mapping == FieldMapping::Structure
|| member.mapping == FieldMapping::Const
{
continue; }
let epics_val = match get_nested_field(value, &member.field_name) {
Some(pv_field) => self.convert_member_value(member, pv_field).await,
None => None,
};
writes.push((member, epics_val));
}
for (member, val) in writes {
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
if member.mapping == FieldMapping::Proc {
self.db
.process_record(record_name)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
} else if let Some(epics_val) = val {
if use_process {
self.db
.put_record_field_from_ca(record_name, field_name, epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
} else {
self.db
.put_pv(&format!("{record_name}.{field_name}"), epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
}
}
}
} else {
for member in ordered {
if member.mapping == FieldMapping::Structure
|| member.mapping == FieldMapping::Const
{
continue; }
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
if member.mapping == FieldMapping::Proc {
self.db
.process_record(record_name)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
continue;
}
let pv_field = match get_nested_field(value, &member.field_name) {
Some(f) => f,
None => continue,
};
let epics_val = match self.convert_member_value(member, pv_field).await {
Some(v) => v,
None => continue,
};
if use_process {
self.db
.put_record_field_from_ca(record_name, field_name, epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
} else {
self.db
.put_pv(&format!("{record_name}.{field_name}"), epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
}
}
}
Ok(())
}
async fn get_field(&self) -> BridgeResult<FieldDesc> {
let struct_id = self.def.struct_id.as_deref().unwrap_or("structure");
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
for member in &self.def.members {
if member.mapping == FieldMapping::Proc {
continue;
}
let mut desc = match member.mapping {
FieldMapping::Structure => {
let sid = member.struct_id.as_deref().unwrap_or("");
FieldDesc::Structure {
struct_id: sid.into(),
fields: Vec::new(),
}
}
FieldMapping::Const => {
match &member.const_value {
Some(pv_field) => pv_field_to_field_desc(pv_field),
None => FieldDesc::Scalar(ScalarType::Int),
}
}
_ => {
let (nt_type, scalar_type) = self.introspect_member(member).await?;
match member.mapping {
FieldMapping::Scalar => pvif::build_field_desc_for_nt(nt_type, scalar_type),
FieldMapping::Plain => FieldDesc::Scalar(scalar_type),
FieldMapping::Meta => meta_desc(),
FieldMapping::Any => FieldDesc::Scalar(scalar_type),
_ => continue,
}
}
};
if let Some(member_id) = &member.struct_id
&& let FieldDesc::Structure { struct_id, .. } = &mut desc
{
*struct_id = member_id.clone();
}
set_nested_field_desc(&mut fields, &member.field_name, desc);
}
Ok(FieldDesc::Structure {
struct_id: struct_id.into(),
fields,
})
}
async fn create_monitor(&self) -> BridgeResult<AnyMonitor> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"monitor create denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
Ok(AnyMonitor::Group(Box::new(
GroupMonitor::new(self.db.clone(), self.def.clone()).with_access(self.access.clone()),
)))
}
}
#[derive(Debug, Clone, Copy)]
enum MemberEventKind {
Value,
Property,
}
struct MemberEvent {
member_index: usize,
kind: MemberEventKind,
}
#[derive(Debug, Clone)]
struct FieldPrimingState {
had_value_event: bool,
had_property_event: bool,
}
pub struct GroupMonitor {
db: Arc<PvDatabase>,
def: GroupPvDef,
running: bool,
group_channel: Option<GroupChannel>,
initial_snapshot: Option<PvStructure>,
event_rx: Option<tokio::sync::mpsc::Receiver<MemberEvent>>,
_tasks: Vec<tokio::task::JoinHandle<()>>,
access: super::provider::AccessContext,
priming: Vec<FieldPrimingState>,
events_primed: bool,
}
impl GroupMonitor {
pub fn new(db: Arc<PvDatabase>, def: GroupPvDef) -> Self {
let priming: Vec<FieldPrimingState> = def
.members
.iter()
.map(|member| {
match member.mapping {
FieldMapping::Const | FieldMapping::Structure | FieldMapping::Proc => {
FieldPrimingState {
had_value_event: true,
had_property_event: true,
}
}
FieldMapping::Scalar | FieldMapping::Meta => FieldPrimingState {
had_value_event: false,
had_property_event: false,
},
_ => FieldPrimingState {
had_value_event: false,
had_property_event: true,
},
}
})
.collect();
Self {
db,
def,
running: false,
group_channel: None,
initial_snapshot: None,
event_rx: None,
_tasks: Vec::new(),
access: super::provider::AccessContext::allow_all(),
priming,
events_primed: false,
}
}
pub fn with_access(mut self, access: super::provider::AccessContext) -> Self {
self.access = access;
self
}
}
impl super::provider::PvaMonitor for GroupMonitor {
async fn start(&mut self) -> BridgeResult<()> {
if self.running {
return Ok(());
}
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"monitor read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let (tx, rx) = tokio::sync::mpsc::channel::<MemberEvent>(64);
for (idx, member) in self.def.members.iter().enumerate() {
if member.channel.is_empty() {
continue; }
let (record_name, _) = epics_base_rs::server::database::parse_pv_name(&member.channel);
let value_mask = (epics_base_rs::server::recgbl::EventMask::VALUE
| epics_base_rs::server::recgbl::EventMask::ALARM)
.bits();
if let Some(mut sub) =
DbSubscription::subscribe_with_mask(&self.db, record_name, 0, value_mask).await
{
let tx = tx.clone();
let handle = tokio::spawn(async move {
while sub.recv_snapshot().await.is_some() {
if tx
.send(MemberEvent {
member_index: idx,
kind: MemberEventKind::Value,
})
.await
.is_err()
{
break;
}
}
});
self._tasks.push(handle);
} else {
if let Some(state) = self.priming.get_mut(idx) {
state.had_value_event = true;
}
}
if member.mapping == FieldMapping::Scalar || member.mapping == FieldMapping::Meta {
let prop_mask = epics_base_rs::server::recgbl::EventMask::PROPERTY.bits();
if let Some(mut sub) =
DbSubscription::subscribe_with_mask(&self.db, record_name, 0, prop_mask).await
{
let tx = tx.clone();
let handle = tokio::spawn(async move {
while sub.recv_snapshot().await.is_some() {
if tx
.send(MemberEvent {
member_index: idx,
kind: MemberEventKind::Property,
})
.await
.is_err()
{
break;
}
}
});
self._tasks.push(handle);
} else {
if let Some(state) = self.priming.get_mut(idx) {
state.had_property_event = true;
}
}
}
}
let group_channel =
GroupChannel::new(self.db.clone(), self.def.clone()).with_access(self.access.clone());
let all_primed = self
.priming
.iter()
.all(|p| p.had_value_event && p.had_property_event);
if all_primed {
if let Ok(snapshot) = group_channel.read_group().await {
self.initial_snapshot = Some(snapshot);
}
self.events_primed = true;
}
self.group_channel = Some(group_channel);
self.event_rx = Some(rx);
self.running = true;
Ok(())
}
async fn poll(&mut self) -> Option<PvStructure> {
if let Some(initial) = self.initial_snapshot.take() {
return Some(initial);
}
let rx = self.event_rx.as_mut()?;
loop {
let event = rx.recv().await?;
if !self.events_primed {
if let Some(state) = self.priming.get_mut(event.member_index) {
match event.kind {
MemberEventKind::Value => state.had_value_event = true,
MemberEventKind::Property => state.had_property_event = true,
}
}
let all_primed = self
.priming
.iter()
.all(|p| p.had_value_event && p.had_property_event);
if all_primed {
self.events_primed = true;
let group_channel = self.group_channel.as_ref()?;
return group_channel.read_group().await.ok();
}
continue;
}
let member = match self.def.members.get(event.member_index) {
Some(m) => m,
None => continue,
};
let group_channel = self.group_channel.as_ref()?;
if matches!(event.kind, MemberEventKind::Property) {
return group_channel.read_group().await.ok();
}
match &member.triggers {
TriggerDef::None => continue,
TriggerDef::All | TriggerDef::Fields(_) => {
return group_channel.read_group().await.ok();
}
}
}
}
async fn stop(&mut self) {
self.event_rx = None;
for handle in self._tasks.drain(..) {
handle.abort();
}
self.running = false;
self.group_channel = None;
self.initial_snapshot = None;
self.events_primed = false;
for (i, member) in self.def.members.iter().enumerate() {
if let Some(state) = self.priming.get_mut(i) {
match member.mapping {
FieldMapping::Const | FieldMapping::Structure | FieldMapping::Proc => {
state.had_value_event = true;
state.had_property_event = true;
}
FieldMapping::Scalar | FieldMapping::Meta => {
state.had_value_event = false;
state.had_property_event = false;
}
_ => {
state.had_value_event = false;
state.had_property_event = true;
}
}
}
}
}
}
pub enum AnyMonitor {
Single(BridgeMonitor),
Group(Box<GroupMonitor>),
}
impl super::provider::PvaMonitor for AnyMonitor {
async fn poll(&mut self) -> Option<PvStructure> {
match self {
Self::Single(m) => m.poll().await,
Self::Group(m) => m.poll().await,
}
}
async fn start(&mut self) -> BridgeResult<()> {
match self {
Self::Single(m) => m.start().await,
Self::Group(m) => m.start().await,
}
}
async fn stop(&mut self) {
match self {
Self::Single(m) => m.stop().await,
Self::Group(m) => m.stop().await,
}
}
}
fn meta_desc() -> FieldDesc {
FieldDesc::Structure {
struct_id: "meta_t".into(),
fields: vec![
(
"alarm".into(),
FieldDesc::Structure {
struct_id: "alarm_t".into(),
fields: vec![
("severity".into(), FieldDesc::Scalar(ScalarType::Int)),
("status".into(), FieldDesc::Scalar(ScalarType::Int)),
("message".into(), FieldDesc::Scalar(ScalarType::String)),
],
},
),
(
"timeStamp".into(),
FieldDesc::Structure {
struct_id: "time_t".into(),
fields: vec![
(
"secondsPastEpoch".into(),
FieldDesc::Scalar(ScalarType::Long),
),
("nanoseconds".into(), FieldDesc::Scalar(ScalarType::Int)),
("userTag".into(), FieldDesc::Scalar(ScalarType::Int)),
],
},
),
],
}
}
fn pv_field_to_field_desc(field: &PvField) -> FieldDesc {
use epics_pva_rs::pvdata::ScalarValue;
match field {
PvField::Scalar(sv) => FieldDesc::Scalar(match sv {
ScalarValue::Boolean(_) => ScalarType::Boolean,
ScalarValue::Byte(_) => ScalarType::Byte,
ScalarValue::Short(_) => ScalarType::Short,
ScalarValue::Int(_) => ScalarType::Int,
ScalarValue::Long(_) => ScalarType::Long,
ScalarValue::UByte(_) => ScalarType::UByte,
ScalarValue::UShort(_) => ScalarType::UShort,
ScalarValue::UInt(_) => ScalarType::UInt,
ScalarValue::ULong(_) => ScalarType::ULong,
ScalarValue::Float(_) => ScalarType::Float,
ScalarValue::Double(_) => ScalarType::Double,
ScalarValue::String(_) => ScalarType::String,
}),
PvField::ScalarArray(arr) => {
let elem_type = arr
.first()
.map(|sv| match sv {
ScalarValue::Boolean(_) => ScalarType::Boolean,
ScalarValue::Byte(_) => ScalarType::Byte,
ScalarValue::Short(_) => ScalarType::Short,
ScalarValue::Int(_) => ScalarType::Int,
ScalarValue::Long(_) => ScalarType::Long,
ScalarValue::UByte(_) => ScalarType::UByte,
ScalarValue::UShort(_) => ScalarType::UShort,
ScalarValue::UInt(_) => ScalarType::UInt,
ScalarValue::ULong(_) => ScalarType::ULong,
ScalarValue::Float(_) => ScalarType::Float,
ScalarValue::Double(_) => ScalarType::Double,
ScalarValue::String(_) => ScalarType::String,
})
.unwrap_or(ScalarType::Double);
FieldDesc::ScalarArray(elem_type)
}
PvField::Structure(s) => FieldDesc::Structure {
struct_id: s.struct_id.clone(),
fields: s
.fields
.iter()
.map(|(name, f)| (name.clone(), pv_field_to_field_desc(f)))
.collect(),
},
}
}
fn build_alarm_from_snapshot(snapshot: &epics_base_rs::server::snapshot::Snapshot) -> PvStructure {
use epics_pva_rs::pvdata::ScalarValue;
let mut alarm = PvStructure::new("alarm_t");
alarm.fields.push((
"severity".into(),
PvField::Scalar(ScalarValue::Int(snapshot.alarm.severity as i32)),
));
alarm.fields.push((
"status".into(),
PvField::Scalar(ScalarValue::Int(snapshot.alarm.status as i32)),
));
alarm.fields.push((
"message".into(),
PvField::Scalar(ScalarValue::String(String::new())),
));
alarm
}
fn build_timestamp_from_snapshot_masked(
snapshot: &epics_base_rs::server::snapshot::Snapshot,
nsec_mask: u32,
) -> PvStructure {
use epics_pva_rs::pvdata::ScalarValue;
use std::time::UNIX_EPOCH;
let mut ts = PvStructure::new("time_t");
let (secs, raw_nanos) = match snapshot.timestamp.duration_since(UNIX_EPOCH) {
Ok(d) => (d.as_secs() as i64, d.subsec_nanos()),
Err(_) => (0, 0),
};
let nanos = if nsec_mask != 0 {
(raw_nanos & !nsec_mask) as i32
} else {
raw_nanos as i32
};
let user_tag = if nsec_mask != 0 {
(raw_nanos & nsec_mask) as i32
} else {
0
};
ts.fields.push((
"secondsPastEpoch".into(),
PvField::Scalar(ScalarValue::Long(secs)),
));
ts.fields.push((
"nanoseconds".into(),
PvField::Scalar(ScalarValue::Int(nanos)),
));
ts.fields.push((
"userTag".into(),
PvField::Scalar(ScalarValue::Int(user_tag)),
));
ts
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn nested_field_set_simple() {
let mut pv = PvStructure::new("test");
set_nested_field(
&mut pv,
"x",
PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Int(42)),
);
assert!(pv.get_field("x").is_some());
}
#[test]
fn nested_field_set_deep() {
let mut pv = PvStructure::new("test");
set_nested_field(
&mut pv,
"a.b.c",
PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Double(2.5)),
);
let a = pv.get_field("a");
assert!(a.is_some());
if let Some(PvField::Structure(a_struct)) = a {
if let Some(PvField::Structure(b_struct)) = a_struct.get_field("b") {
assert!(b_struct.get_field("c").is_some());
} else {
panic!("expected b structure");
}
} else {
panic!("expected a structure");
}
}
#[test]
fn nested_field_roundtrip() {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("test");
set_nested_field(&mut pv, "a.b", PvField::Scalar(ScalarValue::Int(99)));
let field = get_nested_field(&pv, "a.b");
assert!(field.is_some());
if let Some(PvField::Scalar(ScalarValue::Int(v))) = field {
assert_eq!(*v, 99);
} else {
panic!("expected Int(99)");
}
}
#[test]
fn nested_field_overwrite() {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("test");
set_nested_field(&mut pv, "x.y", PvField::Scalar(ScalarValue::Int(1)));
set_nested_field(&mut pv, "x.y", PvField::Scalar(ScalarValue::Int(2)));
if let Some(PvField::Scalar(ScalarValue::Int(v))) = get_nested_field(&pv, "x.y") {
assert_eq!(*v, 2);
} else {
panic!("expected Int(2)");
}
}
#[test]
fn nested_field_siblings() {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("test");
set_nested_field(&mut pv, "a.x", PvField::Scalar(ScalarValue::Int(1)));
set_nested_field(&mut pv, "a.y", PvField::Scalar(ScalarValue::Int(2)));
assert!(get_nested_field(&pv, "a.x").is_some());
assert!(get_nested_field(&pv, "a.y").is_some());
}
#[test]
fn nested_desc_simple() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(&mut fields, "x", FieldDesc::Scalar(ScalarType::Double));
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].0, "x");
assert!(matches!(fields[0].1, FieldDesc::Scalar(ScalarType::Double)));
}
#[test]
fn nested_desc_deep() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(
&mut fields,
"axis.position",
FieldDesc::Scalar(ScalarType::Double),
);
set_nested_field_desc(
&mut fields,
"axis.velocity",
FieldDesc::Scalar(ScalarType::Double),
);
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].0, "axis");
if let FieldDesc::Structure { fields: sub, .. } = &fields[0].1 {
assert_eq!(sub.len(), 2);
assert_eq!(sub[0].0, "position");
assert_eq!(sub[1].0, "velocity");
} else {
panic!("expected nested structure");
}
}
#[test]
fn nested_desc_overwrite() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(&mut fields, "x", FieldDesc::Scalar(ScalarType::Int));
set_nested_field_desc(&mut fields, "x", FieldDesc::Scalar(ScalarType::Double));
assert_eq!(fields.len(), 1);
assert!(matches!(fields[0].1, FieldDesc::Scalar(ScalarType::Double)));
}
#[test]
fn nested_desc_mixed_depth() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(&mut fields, "name", FieldDesc::Scalar(ScalarType::String));
set_nested_field_desc(
&mut fields,
"axis.position",
FieldDesc::Scalar(ScalarType::Double),
);
assert_eq!(fields.len(), 2);
assert_eq!(fields[0].0, "name");
assert_eq!(fields[1].0, "axis");
}
#[test]
fn parse_field_path_simple() {
let comps = parse_field_path("abc");
assert_eq!(comps.len(), 1);
assert_eq!(comps[0].name, "abc");
assert_eq!(comps[0].index, None);
}
#[test]
fn parse_field_path_dotted() {
let comps = parse_field_path("a.b.c");
assert_eq!(comps.len(), 3);
assert_eq!(comps[0].name, "a");
assert_eq!(comps[1].name, "b");
assert_eq!(comps[2].name, "c");
assert!(comps.iter().all(|c| c.index.is_none()));
}
#[test]
fn parse_field_path_with_index() {
let comps = parse_field_path("a.b[0].c");
assert_eq!(comps.len(), 3);
assert_eq!(comps[0].name, "a");
assert_eq!(comps[0].index, None);
assert_eq!(comps[1].name, "b");
assert_eq!(comps[1].index, Some(0));
assert_eq!(comps[2].name, "c");
assert_eq!(comps[2].index, None);
}
#[test]
fn parse_field_path_index_at_leaf() {
let comps = parse_field_path("arr[3]");
assert_eq!(comps.len(), 1);
assert_eq!(comps[0].name, "arr");
assert_eq!(comps[0].index, Some(3));
}
#[test]
fn parse_field_path_multiple_indices() {
let comps = parse_field_path("a[1].b[2]");
assert_eq!(comps.len(), 2);
assert_eq!(comps[0].index, Some(1));
assert_eq!(comps[1].index, Some(2));
}
}