use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::database::db_access::DbSubscription;
use epics_base_rs::types::DbFieldType;
use epics_pva_rs::pvdata::{FieldDesc, PvField, PvStructure, ScalarType};
use super::group_config::{GroupMember, GroupPvDef, TriggerDef};
use super::monitor::BridgeMonitor;
use super::pvif::{self, FieldMapping, NtType};
use crate::convert::{dbf_to_scalar_type, epics_to_pv_field};
use crate::error::{BridgeError, BridgeResult};
#[derive(Debug, Clone, PartialEq)]
struct FieldNameComponent {
name: String,
index: Option<u32>,
}
fn parse_field_path(path: &str) -> Vec<FieldNameComponent> {
if path.is_empty() {
return Vec::new();
}
path.split('.')
.filter(|s| !s.is_empty())
.map(|part| {
if let Some(bracket) = part.find('[') {
let name = part[..bracket].to_string();
let rest = &part[bracket + 1..];
let index = rest.strip_suffix(']').and_then(|s| s.parse::<u32>().ok());
FieldNameComponent { name, index }
} else {
FieldNameComponent {
name: part.to_string(),
index: None,
}
}
})
.collect()
}
pub fn get_nested_field<'a>(pv: &'a PvStructure, path: &str) -> Option<Cow<'a, PvField>> {
let components = parse_field_path(path);
if components.is_empty() {
return None;
}
let mut current_struct = pv;
for (i, comp) in components.iter().enumerate() {
let field = current_struct.get_field(&comp.name)?;
let is_last = i == components.len() - 1;
if let Some(idx) = comp.index {
if !is_last {
if let PvField::StructureArray(items) = field {
let element = items.get(idx as usize)?;
current_struct = element;
continue;
}
return None;
}
return match field {
PvField::ScalarArray(arr) => {
let sv = arr.get(idx as usize)?.clone();
Some(Cow::Owned(PvField::Scalar(sv)))
}
PvField::StructureArray(items) => {
let element = items.get(idx as usize)?.clone();
Some(Cow::Owned(PvField::Structure(element)))
}
_ => None,
};
}
if is_last {
return Some(Cow::Borrowed(field));
}
match field {
PvField::Structure(s) => current_struct = s,
_ => return None,
}
}
None
}
pub const GROUP_DEFAULT_QUEUE_SIZE: i32 = 4;
pub fn negotiated_queue_size(pv_request: &PvStructure) -> i32 {
use epics_pva_rs::pvdata::ScalarValue;
let parsed = pv_request
.fields
.iter()
.find_map(|(k, v)| (k == "record").then_some(v))
.and_then(|v| match v {
PvField::Structure(s) => Some(s),
_ => None,
})
.and_then(|rec| {
rec.fields
.iter()
.find_map(|(k, v)| (k == "_options").then_some(v))
})
.and_then(|v| match v {
PvField::Structure(s) => Some(s),
_ => None,
})
.and_then(|opt| {
opt.fields
.iter()
.find_map(|(k, v)| (k == "queueSize").then_some(v))
})
.and_then(|v| match v {
PvField::Scalar(ScalarValue::String(s)) => s.parse::<i32>().ok(),
PvField::Scalar(ScalarValue::Byte(i)) => Some(i32::from(*i)),
PvField::Scalar(ScalarValue::UByte(i)) => Some(i32::from(*i)),
PvField::Scalar(ScalarValue::Short(i)) => Some(i32::from(*i)),
PvField::Scalar(ScalarValue::UShort(i)) => Some(i32::from(*i)),
PvField::Scalar(ScalarValue::Int(i)) => Some(*i),
PvField::Scalar(ScalarValue::UInt(i)) => i32::try_from(*i).ok(),
PvField::Scalar(ScalarValue::Long(l)) => i32::try_from(*l).ok(),
PvField::Scalar(ScalarValue::ULong(l)) => i32::try_from(*l).ok(),
_ => None,
});
match parsed {
Some(n) if n >= 2 => n,
_ => GROUP_DEFAULT_QUEUE_SIZE,
}
}
pub fn push_record_options(pv: &mut PvStructure, atomic: bool, queue_size: i32) {
use epics_pva_rs::pvdata::ScalarValue;
let mut options = PvStructure::new("");
options.fields.push((
"queueSize".into(),
PvField::Scalar(ScalarValue::Int(queue_size)),
));
options.fields.push((
"atomic".into(),
PvField::Scalar(ScalarValue::Boolean(atomic)),
));
let mut record = PvStructure::new("");
record
.fields
.push(("_options".into(), PvField::Structure(options)));
let record_field = PvField::Structure(record);
if let Some(pos) = pv.fields.iter().position(|(n, _)| n == "record") {
pv.fields[pos].1 = record_field;
} else {
pv.fields.push(("record".into(), record_field));
}
}
pub fn set_member_field(pv: &mut PvStructure, member: &GroupMember, value: PvField) {
use crate::qsrv::FieldMapping;
if member.field_name.is_empty() && member.mapping == FieldMapping::Meta {
if let PvField::Structure(meta) = value {
for (name, sub) in meta.fields {
if let Some(pos) = pv.fields.iter().position(|(n, _)| n == &name) {
pv.fields[pos].1 = sub;
} else {
pv.fields.push((name, sub));
}
}
}
return;
}
set_nested_field(pv, &member.field_name, value);
}
pub fn set_nested_field(pv: &mut PvStructure, path: &str, value: PvField) {
let components = parse_field_path(path);
if components.is_empty() {
return;
}
set_nested_field_recursive(pv, &components, value);
}
fn set_nested_field_recursive(
pv: &mut PvStructure,
components: &[FieldNameComponent],
value: PvField,
) {
if components.is_empty() {
return;
}
let comp = &components[0];
if components.len() == 1 && comp.index.is_none() {
if let Some(pos) = pv.fields.iter().position(|(n, _)| n == &comp.name) {
pv.fields[pos].1 = value;
} else {
pv.fields.push((comp.name.clone(), value));
}
return;
}
let sub = get_or_create_struct_field(pv, &comp.name);
set_nested_field_recursive(sub, &components[1..], value);
}
fn get_or_create_struct_field<'a>(pv: &'a mut PvStructure, name: &str) -> &'a mut PvStructure {
let pos = pv.fields.iter().position(|(n, _)| n == name);
if let Some(pos) = pos {
if !matches!(pv.fields[pos].1, PvField::Structure(_)) {
pv.fields[pos].1 = PvField::Structure(PvStructure::new(""));
}
if let PvField::Structure(ref mut s) = pv.fields[pos].1 {
s
} else {
unreachable!()
}
} else {
pv.fields
.push((name.to_string(), PvField::Structure(PvStructure::new(""))));
if let PvField::Structure(ref mut s) = pv.fields.last_mut().unwrap().1 {
s
} else {
unreachable!()
}
}
}
pub fn set_member_field_desc(
fields: &mut Vec<(String, FieldDesc)>,
member: &GroupMember,
leaf: FieldDesc,
) {
use crate::qsrv::FieldMapping;
if member.field_name.is_empty() && member.mapping == FieldMapping::Meta {
if let FieldDesc::Structure {
fields: meta_fields,
..
} = leaf
{
for (name, sub) in meta_fields {
if let Some(pos) = fields.iter().position(|(n, _)| n == &name) {
fields[pos].1 = sub;
} else {
fields.push((name, sub));
}
}
}
return;
}
set_nested_field_desc(fields, &member.field_name, leaf);
}
pub fn set_nested_field_desc(fields: &mut Vec<(String, FieldDesc)>, path: &str, leaf: FieldDesc) {
let components = parse_field_path(path);
if components.is_empty() {
return;
}
set_nested_field_desc_recursive(fields, &components, leaf);
}
fn set_nested_field_desc_recursive(
fields: &mut Vec<(String, FieldDesc)>,
components: &[FieldNameComponent],
leaf: FieldDesc,
) {
if components.is_empty() {
return;
}
let comp = &components[0];
if components.len() == 1 && comp.index.is_none() {
if let Some(pos) = fields.iter().position(|(n, _)| n == &comp.name) {
fields[pos].1 = leaf;
} else {
fields.push((comp.name.clone(), leaf));
}
return;
}
let sub_fields: &mut Vec<(String, FieldDesc)> =
if let Some(pos) = fields.iter().position(|(n, _)| n == &comp.name) {
match &mut fields[pos].1 {
FieldDesc::Structure { fields: f, .. } => f,
other => {
*other = FieldDesc::Structure {
struct_id: String::new(),
fields: Vec::new(),
};
if let FieldDesc::Structure { fields: f, .. } = &mut fields[pos].1 {
f
} else {
unreachable!()
}
}
}
} else {
fields.push((
comp.name.clone(),
FieldDesc::Structure {
struct_id: String::new(),
fields: Vec::new(),
},
));
if let FieldDesc::Structure { fields: f, .. } = &mut fields.last_mut().unwrap().1 {
f
} else {
unreachable!()
}
};
set_nested_field_desc_recursive(sub_fields, &components[1..], leaf);
}
async fn lock_group_records_read(
db: &PvDatabase,
members: &[GroupMember],
) -> Vec<(
String,
tokio::sync::OwnedRwLockReadGuard<epics_base_rs::server::record::RecordInstance>,
)> {
let mut record_names: Vec<String> = members
.iter()
.filter(|m| !m.channel.is_empty())
.map(|m| {
let (rec, _) = epics_base_rs::server::database::parse_pv_name(&m.channel);
rec.to_string()
})
.collect();
record_names.sort();
record_names.dedup();
let mut guards = Vec::new();
for name in &record_names {
if let Some(rec) = db.get_record(name).await {
guards.push((name.clone(), rec.read_owned().await));
}
}
guards
}
async fn group_member_record_names(db: &PvDatabase, members: &[GroupMember]) -> Vec<String> {
let mut names: Vec<String> = Vec::new();
for m in members {
if m.channel.is_empty() {
continue; }
let (rec, _) = epics_base_rs::server::database::parse_pv_name(&m.channel);
let canonical = db
.resolve_alias(rec)
.await
.unwrap_or_else(|| rec.to_string());
names.push(canonical);
}
names.sort();
names.dedup();
names
}
pub struct GroupChannel {
db: Arc<PvDatabase>,
def: GroupPvDef,
access: super::provider::AccessContext,
monitor_queue_size: Option<i32>,
}
impl GroupChannel {
pub fn new(db: Arc<PvDatabase>, def: GroupPvDef) -> Self {
Self {
db,
def,
access: super::provider::AccessContext::allow_all(),
monitor_queue_size: None,
}
}
pub fn with_access(mut self, access: super::provider::AccessContext) -> Self {
self.access = access;
self
}
pub fn with_monitor_queue_size(mut self, queue_size: i32) -> Self {
self.monitor_queue_size = Some(queue_size);
self
}
pub(crate) async fn read_group(&self) -> BridgeResult<PvStructure> {
self.read_group_atomic(self.def.atomic).await
}
pub(crate) async fn read_group_atomic(&self, atomic: bool) -> BridgeResult<PvStructure> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let struct_id = self.def.struct_id.as_deref().unwrap_or("structure");
let mut pv = PvStructure::new(struct_id);
if atomic {
let guards = lock_group_records_read(&self.db, &self.def.members).await;
let guard_map: HashMap<&str, &epics_base_rs::server::record::RecordInstance> = guards
.iter()
.map(|(name, g)| (name.as_str(), &**g))
.collect();
for member in &self.def.members {
if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Structure
{
continue;
}
let field = self.read_member_locked(member, &guard_map)?;
set_member_field(&mut pv, member, field);
}
} else {
for member in &self.def.members {
if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Structure
{
continue;
}
let field = self.read_member(member).await?;
set_member_field(&mut pv, member, field);
}
}
let queue_size = self.monitor_queue_size.unwrap_or(GROUP_DEFAULT_QUEUE_SIZE);
push_record_options(&mut pv, atomic, queue_size);
Ok(pv)
}
#[allow(dead_code)]
async fn read_partial(&self, field_names: &[String]) -> BridgeResult<PvStructure> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let struct_id = self.def.struct_id.as_deref().unwrap_or("structure");
let mut pv = PvStructure::new(struct_id);
for member in &self.def.members {
if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Structure {
continue;
}
if !field_names.contains(&member.field_name) {
continue;
}
let field = self.read_member(member).await?;
set_nested_field(&mut pv, &member.field_name, field);
}
Ok(pv)
}
fn read_member_channelless(member: &GroupMember) -> Option<PvField> {
match member.mapping {
FieldMapping::Const => Some(
member
.const_value
.clone()
.unwrap_or(PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Int(0))),
),
FieldMapping::Structure => Some(PvField::Structure(PvStructure::new(""))),
FieldMapping::Proc => Some(PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Int(0))),
_ => None,
}
}
async fn read_member(&self, member: &GroupMember) -> BridgeResult<PvField> {
if let Some(field) = Self::read_member_channelless(member) {
return Ok(field);
}
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
let rec = self
.db
.get_record(record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(record_name.to_string()))?;
let instance = rec.read().await;
Self::decode_member(member, record_name, field_name, &instance)
}
fn read_member_locked(
&self,
member: &GroupMember,
guard_map: &HashMap<&str, &epics_base_rs::server::record::RecordInstance>,
) -> BridgeResult<PvField> {
if let Some(field) = Self::read_member_channelless(member) {
return Ok(field);
}
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
let instance = *guard_map
.get(record_name)
.ok_or_else(|| BridgeError::RecordNotFound(record_name.to_string()))?;
Self::decode_member(member, record_name, field_name, instance)
}
fn decode_member(
member: &GroupMember,
record_name: &str,
field_name: &str,
instance: &epics_base_rs::server::record::RecordInstance,
) -> BridgeResult<PvField> {
match member.mapping {
FieldMapping::Scalar => {
let snapshot = instance.snapshot_for_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
let rtyp = instance.record.record_type();
let nt_type = NtType::from_record_type(rtyp);
Ok(PvField::Structure(pvif::snapshot_to_pv_structure(
&snapshot, nt_type,
)))
}
FieldMapping::Plain => {
let value = instance.resolve_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
Ok(epics_to_pv_field(&value))
}
FieldMapping::Meta => {
let snapshot = instance.snapshot_for_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
let mut meta = PvStructure::new("meta_t");
meta.fields.push((
"alarm".into(),
PvField::Structure(build_alarm_from_snapshot(&snapshot)),
));
meta.fields.push((
"timeStamp".into(),
PvField::Structure(build_timestamp_from_snapshot_masked(
&snapshot,
member.nsec_mask,
)),
));
Ok(PvField::Structure(meta))
}
FieldMapping::Any => {
let value = instance.resolve_field(field_name).ok_or_else(|| {
BridgeError::FieldNotFound {
record: record_name.to_string(),
field: field_name.to_string(),
}
})?;
Ok(epics_to_pv_field(&value))
}
FieldMapping::Proc | FieldMapping::Structure | FieldMapping::Const => unreachable!(),
}
}
async fn introspect_member(&self, member: &GroupMember) -> BridgeResult<(NtType, ScalarType)> {
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
let rec = self
.db
.get_record(record_name)
.await
.ok_or_else(|| BridgeError::RecordNotFound(record_name.to_string()))?;
let instance = rec.read().await;
let rtyp = instance.record.record_type();
let nt_type = NtType::from_record_type(rtyp);
let field_upper = field_name.to_ascii_uppercase();
let value_dbf = instance
.record
.field_list()
.iter()
.find(|f| f.name == field_upper)
.map(|f| f.dbf_type)
.unwrap_or(DbFieldType::Double);
Ok((nt_type, dbf_to_scalar_type(value_dbf)))
}
async fn member_dbf_type(&self, member: &GroupMember) -> DbFieldType {
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
let rec = match self.db.get_record(record_name).await {
Some(r) => r,
None => return DbFieldType::Double,
};
let instance = rec.read().await;
let field_upper = field_name.to_ascii_uppercase();
instance
.record
.field_list()
.iter()
.find(|f| f.name == field_upper)
.map(|f| f.dbf_type)
.unwrap_or(DbFieldType::Double)
}
async fn convert_member_value(
&self,
member: &GroupMember,
pv_field: &epics_pva_rs::pvdata::PvField,
) -> Option<epics_base_rs::types::EpicsValue> {
use epics_pva_rs::pvdata::PvField;
match pv_field {
PvField::Scalar(sv) => {
let target = self.member_dbf_type(member).await;
Some(crate::convert::scalar_to_epics_typed(sv, target))
}
_ => crate::convert::pv_field_to_epics(pv_field),
}
}
pub async fn put_with_options(
&self,
value: &PvStructure,
opts: super::channel::PutOptions,
atomic_override: Option<bool>,
) -> BridgeResult<()> {
if !self.access.can_write(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"write denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let use_process = opts.process != super::channel::ProcessMode::Inhibit;
let atomic = atomic_override.unwrap_or(self.def.atomic);
let mut ordered: Vec<(&GroupMember, i32)> = self
.def
.members
.iter()
.filter_map(|m| m.put_order.map(|po| (m, po)))
.collect();
ordered.sort_by_key(|(_, po)| *po);
let ordered: Vec<&GroupMember> = ordered.into_iter().map(|(m, _)| m).collect();
for m in &ordered {
if member_targets_link_field(&m.channel) {
return Err(BridgeError::PutRejected(format!(
"group {} PUT: member '{}' targets link field '{}' \
(pvxs groupsource.cpp:548 rejects link-class field writes)",
self.def.name, m.field_name, m.channel
)));
}
}
for m in &ordered {
if m.channel.is_empty() {
continue;
}
if !self.access.can_write(&m.channel) {
return Err(BridgeError::PutRejected(format!(
"group {} PUT: member '{}' field '{}' write denied for \
user='{}' host='{}' (per-member ACF, pvxs \
groupsource.cpp:161)",
self.def.name, m.field_name, m.channel, self.access.user, self.access.host
)));
}
}
if atomic {
let member_records = group_member_record_names(&self.db, &self.def.members).await;
let _many_guard = self.db.lock_records(&member_records).await;
let _atomic_guard = self.def.atomic_write_lock.lock().await;
let mut writes: Vec<(&GroupMember, Option<epics_base_rs::types::EpicsValue>)> =
Vec::new();
for member in &ordered {
if member.mapping == FieldMapping::Proc {
writes.push((member, None));
continue;
}
if member.mapping == FieldMapping::Structure
|| member.mapping == FieldMapping::Const
{
continue; }
let epics_val = match get_nested_field(value, &member.field_name) {
Some(pv_field) => self.convert_member_value(member, &pv_field).await,
None => None,
};
writes.push((member, epics_val));
}
for (member, val) in writes {
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
if member.mapping == FieldMapping::Proc {
self.db
.process_record_already_locked(record_name)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
} else if let Some(epics_val) = val {
if use_process {
self.db
.put_record_field_from_ca_already_locked(
record_name,
field_name,
epics_val,
)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
} else {
self.db
.put_pv_already_locked(
&format!("{record_name}.{field_name}"),
epics_val,
)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
}
}
}
} else {
for member in ordered {
if member.mapping == FieldMapping::Structure
|| member.mapping == FieldMapping::Const
{
continue; }
let (record_name, field_name) =
epics_base_rs::server::database::parse_pv_name(&member.channel);
if member.mapping == FieldMapping::Proc {
self.db
.process_record(record_name)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
continue;
}
let pv_field = match get_nested_field(value, &member.field_name) {
Some(f) => f,
None => continue,
};
let epics_val = match self.convert_member_value(member, &pv_field).await {
Some(v) => v,
None => continue,
};
if use_process {
self.db
.put_record_field_from_ca(record_name, field_name, epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
} else {
self.db
.put_pv(&format!("{record_name}.{field_name}"), epics_val)
.await
.map_err(|e| BridgeError::PutRejected(e.to_string()))?;
}
}
}
Ok(())
}
}
impl super::provider::Channel for GroupChannel {
fn channel_name(&self) -> &str {
&self.def.name
}
async fn get(&self, request: &PvStructure) -> BridgeResult<PvStructure> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let atomic = super::channel::atomic_from_pv_request(request).unwrap_or(self.def.atomic);
let full = self.read_group_atomic(atomic).await?;
Ok(pvif::filter_by_request(&full, request))
}
async fn put(&self, value: &PvStructure) -> BridgeResult<()> {
let opts = super::channel::PutOptions::from_pv_request(value);
let atomic_override = super::channel::atomic_from_pv_request(value);
self.put_with_options(value, opts, atomic_override).await
}
async fn get_field(&self) -> BridgeResult<FieldDesc> {
let struct_id = self.def.struct_id.as_deref().unwrap_or("structure");
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
for member in &self.def.members {
if member.mapping == FieldMapping::Proc {
continue;
}
let mut desc = match member.mapping {
FieldMapping::Structure => {
let sid = member.struct_id.as_deref().unwrap_or("");
FieldDesc::Structure {
struct_id: sid.into(),
fields: Vec::new(),
}
}
FieldMapping::Const => {
match &member.const_value {
Some(pv_field) => pv_field_to_field_desc(pv_field),
None => FieldDesc::Scalar(ScalarType::Int),
}
}
_ => {
let (nt_type, scalar_type) = self.introspect_member(member).await?;
match member.mapping {
FieldMapping::Scalar => pvif::build_field_desc_for_nt(nt_type, scalar_type),
FieldMapping::Plain => FieldDesc::Scalar(scalar_type),
FieldMapping::Meta => meta_desc(),
FieldMapping::Any => FieldDesc::Scalar(scalar_type),
_ => continue,
}
}
};
if let Some(member_id) = &member.struct_id
&& let FieldDesc::Structure { struct_id, .. } = &mut desc
{
*struct_id = member_id.clone();
}
set_member_field_desc(&mut fields, member, desc);
}
Ok(FieldDesc::Structure {
struct_id: struct_id.into(),
fields,
})
}
async fn create_monitor(&self) -> BridgeResult<AnyMonitor> {
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"monitor create denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
Ok(AnyMonitor::Group(Box::new(
GroupMonitor::new(self.db.clone(), self.def.clone()).with_access(self.access.clone()),
)))
}
}
#[derive(Debug, Clone, Copy)]
enum MemberEventKind {
Value,
Property,
}
struct MemberEvent {
member_index: usize,
kind: MemberEventKind,
}
#[derive(Debug, Clone)]
struct FieldPrimingState {
had_value_event: bool,
had_property_event: bool,
}
pub struct MemberTaskGuard(tokio::task::AbortHandle);
impl Drop for MemberTaskGuard {
fn drop(&mut self) {
self.0.abort();
}
}
pub struct GroupMonitor {
db: Arc<PvDatabase>,
def: GroupPvDef,
running: bool,
group_channel: Option<GroupChannel>,
initial_snapshot: Option<PvStructure>,
event_rx: Option<tokio::sync::mpsc::Receiver<MemberEvent>>,
_tasks: Vec<MemberTaskGuard>,
access: super::provider::AccessContext,
priming: Vec<FieldPrimingState>,
events_primed: bool,
monitor_queue_size: i32,
}
impl GroupMonitor {
pub fn new(db: Arc<PvDatabase>, def: GroupPvDef) -> Self {
let priming: Vec<FieldPrimingState> = def
.members
.iter()
.map(|member| {
match member.mapping {
FieldMapping::Const | FieldMapping::Structure | FieldMapping::Proc => {
FieldPrimingState {
had_value_event: true,
had_property_event: true,
}
}
FieldMapping::Scalar | FieldMapping::Meta => FieldPrimingState {
had_value_event: false,
had_property_event: false,
},
_ => FieldPrimingState {
had_value_event: false,
had_property_event: true,
},
}
})
.collect();
Self {
db,
def,
running: false,
group_channel: None,
initial_snapshot: None,
event_rx: None,
_tasks: Vec::new(),
access: super::provider::AccessContext::allow_all(),
priming,
events_primed: false,
monitor_queue_size: GROUP_DEFAULT_QUEUE_SIZE,
}
}
pub fn with_access(mut self, access: super::provider::AccessContext) -> Self {
self.access = access;
self
}
pub fn with_queue_size(mut self, queue_size: i32) -> Self {
self.monitor_queue_size = queue_size;
self
}
}
impl super::provider::PvaMonitor for GroupMonitor {
async fn start(&mut self) -> BridgeResult<()> {
if self.running {
return Ok(());
}
if !self.access.can_read(&self.def.name) {
return Err(BridgeError::PutRejected(format!(
"monitor read denied for group {} (user='{}' host='{}')",
self.def.name, self.access.user, self.access.host
)));
}
let cap = (self.def.members.len() * 4).max(64);
let (tx, rx) = tokio::sync::mpsc::channel::<MemberEvent>(cap);
for (idx, member) in self.def.members.iter().enumerate() {
if member.channel.is_empty() {
continue; }
let (record_name, _) = epics_base_rs::server::database::parse_pv_name(&member.channel);
let value_mask = (epics_base_rs::server::recgbl::EventMask::VALUE
| epics_base_rs::server::recgbl::EventMask::ALARM
| epics_base_rs::server::recgbl::EventMask::LOG)
.bits();
if let Some(mut sub) =
DbSubscription::subscribe_with_mask(&self.db, record_name, 0, value_mask).await
{
let tx = tx.clone();
let handle = tokio::spawn(async move {
while sub.recv_snapshot().await.is_some() {
if tx
.send(MemberEvent {
member_index: idx,
kind: MemberEventKind::Value,
})
.await
.is_err()
{
break;
}
}
});
self._tasks.push(MemberTaskGuard(handle.abort_handle()));
} else {
if let Some(state) = self.priming.get_mut(idx) {
state.had_value_event = true;
}
}
if member.mapping == FieldMapping::Scalar || member.mapping == FieldMapping::Meta {
let prop_mask = epics_base_rs::server::recgbl::EventMask::PROPERTY.bits();
if let Some(mut sub) =
DbSubscription::subscribe_with_mask(&self.db, record_name, 0, prop_mask).await
{
let tx = tx.clone();
let handle = tokio::spawn(async move {
while sub.recv_snapshot().await.is_some() {
if tx
.send(MemberEvent {
member_index: idx,
kind: MemberEventKind::Property,
})
.await
.is_err()
{
break;
}
}
});
self._tasks.push(MemberTaskGuard(handle.abort_handle()));
} else {
if let Some(state) = self.priming.get_mut(idx) {
state.had_property_event = true;
}
}
}
}
let group_channel = GroupChannel::new(self.db.clone(), self.def.clone())
.with_access(self.access.clone())
.with_monitor_queue_size(self.monitor_queue_size);
let all_primed = self
.priming
.iter()
.all(|p| p.had_value_event && p.had_property_event);
if all_primed {
if let Ok(snapshot) = group_channel.read_group().await {
self.initial_snapshot = Some(snapshot);
}
self.events_primed = true;
}
self.group_channel = Some(group_channel);
self.event_rx = Some(rx);
self.running = true;
Ok(())
}
async fn poll(&mut self) -> Option<PvStructure> {
if let Some(initial) = self.initial_snapshot.take() {
return Some(initial);
}
let rx = self.event_rx.as_mut()?;
loop {
let event = rx.recv().await?;
if !self.events_primed {
if let Some(state) = self.priming.get_mut(event.member_index) {
match event.kind {
MemberEventKind::Value => state.had_value_event = true,
MemberEventKind::Property => state.had_property_event = true,
}
}
let all_primed = self
.priming
.iter()
.all(|p| p.had_value_event && p.had_property_event);
if all_primed {
self.events_primed = true;
let group_channel = self.group_channel.as_ref()?;
return group_channel.read_group().await.ok();
}
continue;
}
let member = match self.def.members.get(event.member_index) {
Some(m) => m,
None => continue,
};
let group_channel = self.group_channel.as_ref()?;
if matches!(event.kind, MemberEventKind::Property) {
return group_channel.read_group().await.ok();
}
match &member.triggers {
TriggerDef::None => continue,
TriggerDef::SelfOnly | TriggerDef::All | TriggerDef::Fields(_) => {
return group_channel.read_group().await.ok();
}
}
}
}
async fn stop(&mut self) {
self.event_rx = None;
self._tasks.clear();
self.running = false;
self.group_channel = None;
self.initial_snapshot = None;
self.events_primed = false;
for (i, member) in self.def.members.iter().enumerate() {
if let Some(state) = self.priming.get_mut(i) {
match member.mapping {
FieldMapping::Const | FieldMapping::Structure | FieldMapping::Proc => {
state.had_value_event = true;
state.had_property_event = true;
}
FieldMapping::Scalar | FieldMapping::Meta => {
state.had_value_event = false;
state.had_property_event = false;
}
_ => {
state.had_value_event = false;
state.had_property_event = true;
}
}
}
}
}
}
pub enum AnyMonitor {
Single(Box<BridgeMonitor>),
Group(Box<GroupMonitor>),
}
impl AnyMonitor {
pub fn with_queue_size(self, queue_size: i32) -> Self {
match self {
Self::Group(m) => Self::Group(Box::new(m.with_queue_size(queue_size))),
single => single,
}
}
}
impl super::provider::PvaMonitor for AnyMonitor {
async fn poll(&mut self) -> Option<PvStructure> {
match self {
Self::Single(m) => m.poll().await,
Self::Group(m) => m.poll().await,
}
}
async fn start(&mut self) -> BridgeResult<()> {
match self {
Self::Single(m) => m.start().await,
Self::Group(m) => m.start().await,
}
}
async fn stop(&mut self) {
match self {
Self::Single(m) => m.stop().await,
Self::Group(m) => m.stop().await,
}
}
}
fn member_targets_link_field(channel: &str) -> bool {
let (_, field) = epics_base_rs::server::database::parse_pv_name(channel);
let f = field.to_ascii_uppercase();
if matches!(f.as_str(), "FLNK" | "DOL" | "SDIS" | "INP" | "OUT") {
return true;
}
if let Some(rest) = f.strip_prefix("INP")
&& rest.len() == 1
&& rest.chars().next().unwrap().is_ascii_uppercase()
{
return true;
}
if let Some(rest) = f.strip_prefix("OUT")
&& rest.len() == 1
&& rest.chars().next().unwrap().is_ascii_uppercase()
{
return true;
}
false
}
#[cfg(test)]
mod link_field_tests {
use super::member_targets_link_field;
#[test]
fn link_class_field_names_rejected() {
for f in ["FLNK", "DOL", "SDIS", "INP", "OUT", "INPA", "OUTL", "INPZ"] {
assert!(
member_targets_link_field(&format!("REC.{f}")),
"expected {f} to be classified as a link field"
);
}
}
#[test]
fn value_class_field_names_allowed() {
for f in ["VAL", "DESC", "EGU", "PREC", "SCAN", "HIHI", "LOLO", "RVAL"] {
assert!(
!member_targets_link_field(&format!("REC.{f}")),
"expected {f} to be classified as a value field, not a link"
);
}
}
#[test]
fn bare_record_default_is_val_not_link() {
assert!(!member_targets_link_field("REC"));
}
}
fn meta_desc() -> FieldDesc {
FieldDesc::Structure {
struct_id: "meta_t".into(),
fields: vec![
(
"alarm".into(),
FieldDesc::Structure {
struct_id: "alarm_t".into(),
fields: vec![
("severity".into(), FieldDesc::Scalar(ScalarType::Int)),
("status".into(), FieldDesc::Scalar(ScalarType::Int)),
("message".into(), FieldDesc::Scalar(ScalarType::String)),
],
},
),
(
"timeStamp".into(),
FieldDesc::Structure {
struct_id: "time_t".into(),
fields: vec![
(
"secondsPastEpoch".into(),
FieldDesc::Scalar(ScalarType::Long),
),
("nanoseconds".into(), FieldDesc::Scalar(ScalarType::Int)),
("userTag".into(), FieldDesc::Scalar(ScalarType::Int)),
],
},
),
],
}
}
fn pv_field_to_field_desc(field: &PvField) -> FieldDesc {
use epics_pva_rs::pvdata::ScalarValue;
match field {
PvField::Scalar(sv) => FieldDesc::Scalar(match sv {
ScalarValue::Boolean(_) => ScalarType::Boolean,
ScalarValue::Byte(_) => ScalarType::Byte,
ScalarValue::Short(_) => ScalarType::Short,
ScalarValue::Int(_) => ScalarType::Int,
ScalarValue::Long(_) => ScalarType::Long,
ScalarValue::UByte(_) => ScalarType::UByte,
ScalarValue::UShort(_) => ScalarType::UShort,
ScalarValue::UInt(_) => ScalarType::UInt,
ScalarValue::ULong(_) => ScalarType::ULong,
ScalarValue::Float(_) => ScalarType::Float,
ScalarValue::Double(_) => ScalarType::Double,
ScalarValue::String(_) => ScalarType::String,
}),
PvField::ScalarArray(arr) => {
let elem_type = arr
.first()
.map(|sv| match sv {
ScalarValue::Boolean(_) => ScalarType::Boolean,
ScalarValue::Byte(_) => ScalarType::Byte,
ScalarValue::Short(_) => ScalarType::Short,
ScalarValue::Int(_) => ScalarType::Int,
ScalarValue::Long(_) => ScalarType::Long,
ScalarValue::UByte(_) => ScalarType::UByte,
ScalarValue::UShort(_) => ScalarType::UShort,
ScalarValue::UInt(_) => ScalarType::UInt,
ScalarValue::ULong(_) => ScalarType::ULong,
ScalarValue::Float(_) => ScalarType::Float,
ScalarValue::Double(_) => ScalarType::Double,
ScalarValue::String(_) => ScalarType::String,
})
.unwrap_or(ScalarType::Double);
FieldDesc::ScalarArray(elem_type)
}
PvField::ScalarArrayTyped(arr) => FieldDesc::ScalarArray(arr.scalar_type()),
PvField::Structure(s) => FieldDesc::Structure {
struct_id: s.struct_id.clone(),
fields: s
.fields
.iter()
.map(|(name, f)| (name.clone(), pv_field_to_field_desc(f)))
.collect(),
},
PvField::StructureArray(_)
| PvField::Union { .. }
| PvField::UnionArray(_)
| PvField::Variant(_)
| PvField::VariantArray(_)
| PvField::Null => FieldDesc::Structure {
struct_id: String::new(),
fields: Vec::new(),
},
}
}
fn build_alarm_from_snapshot(snapshot: &epics_base_rs::server::snapshot::Snapshot) -> PvStructure {
use epics_pva_rs::pvdata::ScalarValue;
let mut alarm = PvStructure::new("alarm_t");
alarm.fields.push((
"severity".into(),
PvField::Scalar(ScalarValue::Int(snapshot.alarm.severity as i32)),
));
alarm.fields.push((
"status".into(),
PvField::Scalar(ScalarValue::Int(snapshot.alarm.status as i32)),
));
alarm.fields.push((
"message".into(),
PvField::Scalar(ScalarValue::String(String::new())),
));
alarm
}
fn build_timestamp_from_snapshot_masked(
snapshot: &epics_base_rs::server::snapshot::Snapshot,
nsec_mask: u32,
) -> PvStructure {
use epics_pva_rs::pvdata::ScalarValue;
use std::time::UNIX_EPOCH;
let mut ts = PvStructure::new("time_t");
let (secs, raw_nanos) = match snapshot.timestamp.duration_since(UNIX_EPOCH) {
Ok(d) => (d.as_secs() as i64, d.subsec_nanos()),
Err(_) => (0, 0),
};
let nanos = if nsec_mask != 0 {
(raw_nanos & !nsec_mask) as i32
} else {
raw_nanos as i32
};
let user_tag = if nsec_mask != 0 {
(raw_nanos & nsec_mask) as i32
} else {
0
};
ts.fields.push((
"secondsPastEpoch".into(),
PvField::Scalar(ScalarValue::Long(secs)),
));
ts.fields.push((
"nanoseconds".into(),
PvField::Scalar(ScalarValue::Int(nanos)),
));
ts.fields.push((
"userTag".into(),
PvField::Scalar(ScalarValue::Int(user_tag)),
));
ts
}
#[cfg(test)]
mod tests {
use super::*;
use crate::qsrv::provider::Channel;
use std::time::Duration;
#[test]
fn nested_field_set_simple() {
let mut pv = PvStructure::new("test");
set_nested_field(
&mut pv,
"x",
PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Int(42)),
);
assert!(pv.get_field("x").is_some());
}
#[test]
fn nested_field_set_deep() {
let mut pv = PvStructure::new("test");
set_nested_field(
&mut pv,
"a.b.c",
PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Double(2.5)),
);
let a = pv.get_field("a");
assert!(a.is_some());
if let Some(PvField::Structure(a_struct)) = a {
if let Some(PvField::Structure(b_struct)) = a_struct.get_field("b") {
assert!(b_struct.get_field("c").is_some());
} else {
panic!("expected b structure");
}
} else {
panic!("expected a structure");
}
}
#[test]
fn nested_field_roundtrip() {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("test");
set_nested_field(&mut pv, "a.b", PvField::Scalar(ScalarValue::Int(99)));
let field = get_nested_field(&pv, "a.b");
assert!(field.is_some());
if let Some(PvField::Scalar(ScalarValue::Int(v))) = field.as_deref() {
assert_eq!(*v, 99);
} else {
panic!("expected Int(99)");
}
}
#[test]
fn nested_field_overwrite() {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("test");
set_nested_field(&mut pv, "x.y", PvField::Scalar(ScalarValue::Int(1)));
set_nested_field(&mut pv, "x.y", PvField::Scalar(ScalarValue::Int(2)));
if let Some(PvField::Scalar(ScalarValue::Int(v))) = get_nested_field(&pv, "x.y").as_deref()
{
assert_eq!(*v, 2);
} else {
panic!("expected Int(2)");
}
}
#[test]
fn nested_field_siblings() {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("test");
set_nested_field(&mut pv, "a.x", PvField::Scalar(ScalarValue::Int(1)));
set_nested_field(&mut pv, "a.y", PvField::Scalar(ScalarValue::Int(2)));
assert!(get_nested_field(&pv, "a.x").is_some());
assert!(get_nested_field(&pv, "a.y").is_some());
}
#[test]
fn nested_field_scalar_array_index() {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("test");
pv.fields.push((
"samples".into(),
PvField::ScalarArray(vec![
ScalarValue::Double(1.5),
ScalarValue::Double(2.5),
ScalarValue::Double(3.5),
]),
));
match get_nested_field(&pv, "samples[1]").as_deref() {
Some(PvField::Scalar(ScalarValue::Double(v))) => assert_eq!(*v, 2.5),
other => panic!("expected Scalar(Double(2.5)), got {other:?}"),
}
assert!(get_nested_field(&pv, "samples[99]").is_none());
}
#[test]
fn nested_field_structure_array_index() {
use epics_pva_rs::pvdata::ScalarValue;
let mut elem0 = PvStructure::new("entry");
elem0.fields.push((
"name".into(),
PvField::Scalar(ScalarValue::String("a".into())),
));
let mut elem1 = PvStructure::new("entry");
elem1.fields.push((
"name".into(),
PvField::Scalar(ScalarValue::String("b".into())),
));
let mut pv = PvStructure::new("test");
pv.fields.push((
"entries".into(),
PvField::StructureArray(vec![elem0, elem1]),
));
match get_nested_field(&pv, "entries[1].name").as_deref() {
Some(PvField::Scalar(ScalarValue::String(s))) => assert_eq!(s, "b"),
other => panic!("expected Scalar(String(\"b\")), got {other:?}"),
}
}
#[test]
fn nested_desc_simple() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(&mut fields, "x", FieldDesc::Scalar(ScalarType::Double));
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].0, "x");
assert!(matches!(fields[0].1, FieldDesc::Scalar(ScalarType::Double)));
}
#[test]
fn nested_desc_deep() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(
&mut fields,
"axis.position",
FieldDesc::Scalar(ScalarType::Double),
);
set_nested_field_desc(
&mut fields,
"axis.velocity",
FieldDesc::Scalar(ScalarType::Double),
);
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].0, "axis");
if let FieldDesc::Structure { fields: sub, .. } = &fields[0].1 {
assert_eq!(sub.len(), 2);
assert_eq!(sub[0].0, "position");
assert_eq!(sub[1].0, "velocity");
} else {
panic!("expected nested structure");
}
}
#[test]
fn nested_desc_overwrite() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(&mut fields, "x", FieldDesc::Scalar(ScalarType::Int));
set_nested_field_desc(&mut fields, "x", FieldDesc::Scalar(ScalarType::Double));
assert_eq!(fields.len(), 1);
assert!(matches!(fields[0].1, FieldDesc::Scalar(ScalarType::Double)));
}
#[test]
fn nested_desc_mixed_depth() {
use epics_pva_rs::pvdata::ScalarType;
let mut fields: Vec<(String, FieldDesc)> = Vec::new();
set_nested_field_desc(&mut fields, "name", FieldDesc::Scalar(ScalarType::String));
set_nested_field_desc(
&mut fields,
"axis.position",
FieldDesc::Scalar(ScalarType::Double),
);
assert_eq!(fields.len(), 2);
assert_eq!(fields[0].0, "name");
assert_eq!(fields[1].0, "axis");
}
#[test]
fn parse_field_path_simple() {
let comps = parse_field_path("abc");
assert_eq!(comps.len(), 1);
assert_eq!(comps[0].name, "abc");
assert_eq!(comps[0].index, None);
}
#[test]
fn parse_field_path_dotted() {
let comps = parse_field_path("a.b.c");
assert_eq!(comps.len(), 3);
assert_eq!(comps[0].name, "a");
assert_eq!(comps[1].name, "b");
assert_eq!(comps[2].name, "c");
assert!(comps.iter().all(|c| c.index.is_none()));
}
#[test]
fn parse_field_path_with_index() {
let comps = parse_field_path("a.b[0].c");
assert_eq!(comps.len(), 3);
assert_eq!(comps[0].name, "a");
assert_eq!(comps[0].index, None);
assert_eq!(comps[1].name, "b");
assert_eq!(comps[1].index, Some(0));
assert_eq!(comps[2].name, "c");
assert_eq!(comps[2].index, None);
}
#[test]
fn parse_field_path_index_at_leaf() {
let comps = parse_field_path("arr[3]");
assert_eq!(comps.len(), 1);
assert_eq!(comps[0].name, "arr");
assert_eq!(comps[0].index, Some(3));
}
#[test]
fn parse_field_path_multiple_indices() {
let comps = parse_field_path("a[1].b[2]");
assert_eq!(comps.len(), 2);
assert_eq!(comps[0].index, Some(1));
assert_eq!(comps[1].index, Some(2));
}
async fn atomic_group_fixture() -> (Arc<PvDatabase>, GroupPvDef) {
use epics_base_rs::server::records::ai::AiRecord;
let db = Arc::new(PvDatabase::new());
db.add_record("A:rec", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
db.add_record("B:rec", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
let cfg = r#"{
"ATOMIC:GRP": {
"+atomic": true,
"a": {"+type": "plain", "+channel": "A:rec.VAL", "+putorder": 0},
"b": {"+type": "plain", "+channel": "B:rec.VAL", "+putorder": 1}
}
}"#;
let mut defs = super::super::group_config::parse_group_config(cfg).unwrap();
let def = defs.pop().unwrap();
assert!(def.atomic);
(db, def)
}
fn atomic_put_value(a: f64, b: f64) -> PvStructure {
use epics_pva_rs::pvdata::ScalarValue;
let mut pv = PvStructure::new("structure");
pv.fields
.push(("a".into(), PvField::Scalar(ScalarValue::Double(a))));
pv.fields
.push(("b".into(), PvField::Scalar(ScalarValue::Double(b))));
pv
}
#[tokio::test]
async fn bug4_atomic_put_serializes_on_group_lock() {
let (db, def) = atomic_group_fixture().await;
let channel = GroupChannel::new(db.clone(), def.clone());
let guard = def.atomic_write_lock.clone().lock_owned().await;
let put_fut = tokio::spawn(async move {
channel.put(&atomic_put_value(11.0, 22.0)).await.unwrap();
});
let blocked = tokio::time::timeout(Duration::from_millis(150), async {}).await;
assert!(blocked.is_ok());
assert!(
!put_fut.is_finished(),
"atomic PUT must block while another holder owns atomic_write_lock"
);
drop(guard);
tokio::time::timeout(Duration::from_secs(5), put_fut)
.await
.expect("atomic PUT must complete once the lock is free")
.expect("put task did not panic");
let a = db.get_pv("A:rec.VAL").await.unwrap();
let b = db.get_pv("B:rec.VAL").await.unwrap();
match (a, b) {
(
epics_base_rs::types::EpicsValue::Double(va),
epics_base_rs::types::EpicsValue::Double(vb),
) => {
assert_eq!(va, 11.0);
assert_eq!(vb, 22.0);
}
other => panic!("unexpected member values: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn critical_atomic_read_group_no_deadlock_under_writer() {
let (db, def) = atomic_group_fixture().await;
let channel = GroupChannel::new(db.clone(), def.clone());
let writer_db = db.clone();
let writer = tokio::spawn(async move {
for i in 0..200 {
let _ = writer_db
.put_pv(
"A:rec.VAL",
epics_base_rs::types::EpicsValue::Double(i as f64),
)
.await;
tokio::task::yield_now().await;
}
});
let reader = tokio::spawn(async move {
for _ in 0..200 {
channel
.read_group()
.await
.expect("atomic read_group must succeed");
tokio::task::yield_now().await;
}
});
tokio::time::timeout(Duration::from_secs(10), async {
reader.await.expect("reader task panicked");
writer.await.expect("writer task panicked");
})
.await
.expect("atomic read_group deadlocked under a concurrent writer");
}
#[tokio::test]
async fn critical_atomic_read_group_returns_member_values() {
let (db, def) = atomic_group_fixture().await;
db.put_pv("A:rec.VAL", epics_base_rs::types::EpicsValue::Double(7.5))
.await
.unwrap();
db.put_pv("B:rec.VAL", epics_base_rs::types::EpicsValue::Double(9.25))
.await
.unwrap();
let channel = GroupChannel::new(db.clone(), def);
let pv = channel.read_group().await.unwrap();
match get_nested_field(&pv, "a").as_deref() {
Some(PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Double(v))) => {
assert_eq!(*v, 7.5)
}
other => panic!("member a: expected Double(7.5), got {other:?}"),
}
match get_nested_field(&pv, "b").as_deref() {
Some(PvField::Scalar(epics_pva_rs::pvdata::ScalarValue::Double(v))) => {
assert_eq!(*v, 9.25)
}
other => panic!("member b: expected Double(9.25), got {other:?}"),
}
}
#[tokio::test]
async fn bug4_concurrent_atomic_puts_do_not_interleave() {
let (db, def) = atomic_group_fixture().await;
let ch1 = GroupChannel::new(db.clone(), def.clone());
let ch2 = GroupChannel::new(db.clone(), def.clone());
let guard = def.atomic_write_lock.clone().lock_owned().await;
let p1 = tokio::spawn(async move {
ch1.put(&atomic_put_value(1.0, 1.0)).await.unwrap();
});
let p2 = tokio::spawn(async move {
ch2.put(&atomic_put_value(2.0, 2.0)).await.unwrap();
});
tokio::time::timeout(Duration::from_millis(120), async {})
.await
.ok();
assert!(!p1.is_finished() && !p2.is_finished());
drop(guard);
tokio::time::timeout(Duration::from_secs(5), async {
p1.await.unwrap();
p2.await.unwrap();
})
.await
.expect("both atomic PUTs must complete");
let a = db.get_pv("A:rec.VAL").await.unwrap();
let b = db.get_pv("B:rec.VAL").await.unwrap();
match (a, b) {
(
epics_base_rs::types::EpicsValue::Double(va),
epics_base_rs::types::EpicsValue::Double(vb),
) => {
assert_eq!(
va, vb,
"atomic group must not be half-applied: a={va} b={vb}"
);
assert!(va == 1.0 || va == 2.0);
}
other => panic!("unexpected member values: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn br_r15_atomic_group_excludes_direct_member_write() {
let (db, _def) = atomic_group_fixture().await;
let many = db.lock_records(["A:rec", "B:rec"]).await;
let db_w = db.clone();
let direct = tokio::spawn(async move {
db_w.put_record_field_from_ca(
"A:rec",
"VAL",
epics_base_rs::types::EpicsValue::Double(99.0),
)
.await
.unwrap();
});
tokio::time::timeout(Duration::from_millis(150), async {})
.await
.ok();
assert!(
!direct.is_finished(),
"direct member write must block while the atomic group's \
DBManyLock-equivalent gates are held"
);
drop(many);
tokio::time::timeout(Duration::from_secs(5), direct)
.await
.expect("direct write must complete once gates are released")
.expect("direct write task panicked");
match db.get_pv("A:rec.VAL").await.unwrap() {
epics_base_rs::types::EpicsValue::Double(v) => assert_eq!(v, 99.0),
other => panic!("unexpected A:rec.VAL: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn br_r15_atomic_put_blocks_on_member_record_gates() {
let (db, def) = atomic_group_fixture().await;
let channel = GroupChannel::new(db.clone(), def.clone());
let held = db.lock_record("B:rec").await;
let put = tokio::spawn(async move {
channel.put(&atomic_put_value(5.0, 6.0)).await.unwrap();
});
tokio::time::timeout(Duration::from_millis(150), async {})
.await
.ok();
assert!(
!put.is_finished(),
"atomic group PUT must block while a member-record gate is held"
);
drop(held);
tokio::time::timeout(Duration::from_secs(5), put)
.await
.expect("atomic PUT must complete once the member gate is free")
.expect("atomic PUT task panicked");
let a = db.get_pv("A:rec.VAL").await.unwrap();
let b = db.get_pv("B:rec.VAL").await.unwrap();
match (a, b) {
(
epics_base_rs::types::EpicsValue::Double(va),
epics_base_rs::types::EpicsValue::Double(vb),
) => {
assert_eq!(va, 5.0);
assert_eq!(vb, 6.0);
}
other => panic!("unexpected member values: {other:?}"),
}
}
}