use std::collections::HashMap;
use std::fmt;
use serde::Deserialize;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FieldMapping {
Scalar,
Plain,
Meta,
Any,
Proc,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TriggerDef {
All,
Fields(Vec<String>),
None,
}
#[derive(Debug, Clone)]
pub struct GroupMember {
pub field_name: String,
pub channel: String,
pub mapping: FieldMapping,
pub triggers: TriggerDef,
pub put_order: i32,
pub struct_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct GroupPvDef {
pub name: String,
pub struct_id: Option<String>,
pub atomic: bool,
pub members: Vec<GroupMember>,
}
#[derive(Debug)]
pub struct GroupConfigError(String);
impl fmt::Display for GroupConfigError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "group config: {}", self.0)
}
}
impl std::error::Error for GroupConfigError {}
type Result<T> = std::result::Result<T, GroupConfigError>;
fn err(msg: impl Into<String>) -> GroupConfigError {
GroupConfigError(msg.into())
}
type RawConfig = HashMap<String, RawGroupDef>;
#[derive(Deserialize)]
struct RawGroupDef {
#[serde(rename = "+id", default)]
id: Option<String>,
#[serde(rename = "+atomic", default)]
atomic: Option<bool>,
#[serde(flatten)]
members: HashMap<String, RawMember>,
}
#[derive(Deserialize)]
struct RawMember {
#[serde(rename = "+channel", default)]
channel: Option<String>,
#[serde(rename = "+type", default)]
mapping: Option<String>,
#[serde(rename = "+trigger", default)]
trigger: Option<String>,
#[serde(rename = "+putorder", default)]
putorder: Option<i32>,
#[serde(rename = "+id", default)]
id: Option<String>,
}
pub fn parse_group_config(json: &str) -> Result<Vec<GroupPvDef>> {
let raw: RawConfig =
serde_json::from_str(json).map_err(|e| err(format!("invalid JSON: {e}")))?;
let mut groups: Vec<GroupPvDef> = Vec::with_capacity(raw.len());
for (name, raw_group) in raw {
groups.push(raw_to_group_def(name, raw_group)?);
}
groups.sort_by(|a, b| a.name.cmp(&b.name));
Ok(groups)
}
pub fn parse_info_group(record_name: &str, json: &str) -> Result<Vec<GroupPvDef>> {
let raw: RawConfig =
serde_json::from_str(json).map_err(|e| err(format!("invalid JSON: {e}")))?;
let mut groups: Vec<GroupPvDef> = Vec::with_capacity(raw.len());
for (name, mut raw_group) in raw {
for member in raw_group.members.values_mut() {
if let Some(ref mut ch) = member.channel {
if !ch.contains(':') {
*ch = format!("{record_name}.{ch}");
}
}
}
groups.push(raw_to_group_def(name, raw_group)?);
}
groups.sort_by(|a, b| a.name.cmp(&b.name));
Ok(groups)
}
pub fn merge_group_defs(existing: &mut HashMap<String, GroupPvDef>, new_defs: Vec<GroupPvDef>) {
for def in new_defs {
existing
.entry(def.name.clone())
.and_modify(|e| {
if def.struct_id.is_some() {
e.struct_id.clone_from(&def.struct_id);
}
e.atomic |= def.atomic;
e.members.extend(def.members.iter().cloned());
})
.or_insert(def);
}
}
fn raw_to_group_def(name: String, raw: RawGroupDef) -> Result<GroupPvDef> {
let mut members = Vec::with_capacity(raw.members.len());
let field_names: Vec<&str> = raw.members.keys().map(|s| s.as_str()).collect();
for (field_name, raw_member) in &raw.members {
members.push(parse_member(field_name, raw_member, &field_names)?);
}
members.sort_by(|a, b| a.field_name.cmp(&b.field_name));
Ok(GroupPvDef {
name,
struct_id: raw.id,
atomic: raw.atomic.unwrap_or(false),
members,
})
}
fn parse_member(field_name: &str, raw: &RawMember, all_fields: &[&str]) -> Result<GroupMember> {
let channel = raw
.channel
.clone()
.ok_or_else(|| err(format!("member '{field_name}' missing +channel")))?;
let mapping = match raw.mapping.as_deref() {
None | Some("plain") => FieldMapping::Plain,
Some("scalar") => FieldMapping::Scalar,
Some("meta") => FieldMapping::Meta,
Some("any") => FieldMapping::Any,
Some("proc") => FieldMapping::Proc,
Some(other) => {
return Err(err(format!(
"member '{field_name}': unknown +type '{other}'"
)));
}
};
let triggers = match raw.trigger.as_deref() {
None => TriggerDef::None,
Some("*") => TriggerDef::All,
Some("") => TriggerDef::None,
Some(spec) => {
let names: Vec<String> = spec.split(',').map(|s| s.trim().to_owned()).collect();
for n in &names {
if !all_fields.contains(&n.as_str()) {
return Err(err(format!(
"member '{field_name}': trigger references unknown field '{n}'"
)));
}
}
TriggerDef::Fields(names)
}
};
Ok(GroupMember {
field_name: field_name.to_owned(),
channel,
mapping,
triggers,
put_order: raw.putorder.unwrap_or(0),
struct_id: raw.id.clone(),
})
}
use std::sync::Arc;
use tokio::sync::mpsc;
use spvirit_codec::spvd_decode::{DecodedValue, FieldDesc, FieldType, StructureDesc, TypeCode};
use spvirit_types::{NtPayload, PvValue, ScalarArrayValue, ScalarValue};
use crate::pvstore::PvStore;
use crate::simple_store::descriptor_for_payload;
pub struct GroupPvStore<S: PvStore> {
inner: Arc<S>,
groups: HashMap<String, GroupPvDef>,
}
impl<S: PvStore> GroupPvStore<S> {
pub fn new(inner: Arc<S>, groups: HashMap<String, GroupPvDef>) -> Self {
Self { inner, groups }
}
async fn group_snapshot(&self, def: &GroupPvDef) -> NtPayload {
let mut fields = Vec::with_capacity(def.members.len());
for member in &def.members {
if member.mapping == FieldMapping::Proc {
continue; }
let pv_val = match self.inner.get_snapshot(&member.channel).await {
Some(snap) => payload_to_pv_value(&snap, member.mapping),
None => PvValue::Scalar(ScalarValue::I32(0)), };
fields.push((member.field_name.clone(), pv_val));
}
NtPayload::Generic {
struct_id: def
.struct_id
.clone()
.unwrap_or_else(|| "structure".to_string()),
fields,
}
}
async fn group_descriptor(&self, def: &GroupPvDef) -> StructureDesc {
let mut field_descs = Vec::with_capacity(def.members.len());
for member in &def.members {
if member.mapping == FieldMapping::Proc {
continue;
}
let field_type = match self.inner.get_snapshot(&member.channel).await {
Some(snap) => payload_field_type(&snap, member.mapping),
None => FieldType::Scalar(TypeCode::Int32), };
field_descs.push(FieldDesc {
name: member.field_name.clone(),
field_type,
});
}
StructureDesc {
struct_id: def.struct_id.clone(),
fields: field_descs,
}
}
}
impl<S: PvStore> PvStore for GroupPvStore<S> {
fn has_pv(&self, name: &str) -> impl Future<Output = bool> + Send {
async move {
if self.groups.contains_key(name) {
return true;
}
self.inner.has_pv(name).await
}
}
fn get_snapshot(&self, name: &str) -> impl Future<Output = Option<NtPayload>> + Send {
async move {
if let Some(def) = self.groups.get(name) {
return Some(self.group_snapshot(def).await);
}
self.inner.get_snapshot(name).await
}
}
fn get_descriptor(&self, name: &str) -> impl Future<Output = Option<StructureDesc>> + Send {
async move {
if let Some(def) = self.groups.get(name) {
return Some(self.group_descriptor(def).await);
}
self.inner.get_descriptor(name).await
}
}
fn put_value(
&self,
name: &str,
value: &DecodedValue,
) -> impl Future<Output = std::result::Result<Vec<(String, NtPayload)>, String>> + Send {
let name = name.to_string();
let value = value.clone();
async move {
if let Some(def) = self.groups.get(&name) {
let fields = match &value {
DecodedValue::Structure(f) => f,
_ => return Err("group PUT requires a structure".to_string()),
};
let mut ordered: Vec<&GroupMember> = def.members.iter().collect();
ordered.sort_by_key(|m| m.put_order);
let mut results = Vec::new();
for member in ordered {
if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Meta
{
continue;
}
if let Some((_, sub_val)) = fields.iter().find(|(n, _)| n == &member.field_name)
{
match self.inner.put_value(&member.channel, sub_val).await {
Ok(mut r) => results.append(&mut r),
Err(e) => {
tracing::warn!(
"group PUT {}: member {} failed: {e}",
name,
member.field_name
);
}
}
}
}
Ok(results)
} else {
self.inner.put_value(&name, &value).await
}
}
}
fn is_writable(&self, name: &str) -> impl Future<Output = bool> + Send {
async move {
if let Some(def) = self.groups.get(name) {
for member in &def.members {
if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Meta
{
continue;
}
if self.inner.is_writable(&member.channel).await {
return true;
}
}
return false;
}
self.inner.is_writable(name).await
}
}
fn list_pvs(&self) -> impl Future<Output = Vec<String>> + Send {
async move {
let mut pvs = self.inner.list_pvs().await;
pvs.extend(self.groups.keys().cloned());
pvs.sort();
pvs.dedup();
pvs
}
}
fn subscribe(
&self,
name: &str,
) -> impl Future<Output = Option<mpsc::Receiver<NtPayload>>> + Send {
let name = name.to_string();
async move {
if let Some(def) = self.groups.get(&name) {
return self.subscribe_group(def).await;
}
self.inner.subscribe(&name).await
}
}
}
impl<S: PvStore> GroupPvStore<S> {
async fn subscribe_group(&self, def: &GroupPvDef) -> Option<mpsc::Receiver<NtPayload>> {
let (tx, rx) = mpsc::channel(64);
let inner = self.inner.clone();
let def = def.clone();
let mut member_rxs: Vec<(String, mpsc::Receiver<NtPayload>)> = Vec::new();
for member in &def.members {
if let Some(member_rx) = inner.subscribe(&member.channel).await {
member_rxs.push((member.field_name.clone(), member_rx));
}
}
if member_rxs.is_empty() {
return None;
}
let trigger_map = build_trigger_map(&def);
tokio::spawn(async move {
loop {
let src_field = match poll_any_member(&mut member_rxs).await {
Some(field_name) => field_name,
None => break, };
{
let should_send = match trigger_map.get(&src_field) {
Some(targets) => !targets.is_empty(),
None => false,
};
if should_send {
let mut fields = Vec::with_capacity(def.members.len());
for member in &def.members {
if member.mapping == FieldMapping::Proc {
continue;
}
let pv_val = match inner.get_snapshot(&member.channel).await {
Some(snap) => payload_to_pv_value(&snap, member.mapping),
None => PvValue::Scalar(ScalarValue::I32(0)),
};
fields.push((member.field_name.clone(), pv_val));
}
let payload = NtPayload::Generic {
struct_id: def
.struct_id
.clone()
.unwrap_or_else(|| "structure".to_string()),
fields,
};
if tx.send(payload).await.is_err() {
break; }
}
}
}
});
Some(rx)
}
}
fn build_trigger_map(def: &GroupPvDef) -> HashMap<String, Vec<String>> {
let all_fields: Vec<String> = def.members.iter().map(|m| m.field_name.clone()).collect();
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for member in &def.members {
let targets = match &member.triggers {
TriggerDef::All => all_fields.clone(),
TriggerDef::Fields(names) => names.clone(),
TriggerDef::None => Vec::new(),
};
map.insert(member.field_name.clone(), targets);
}
map
}
async fn poll_any_member(members: &mut Vec<(String, mpsc::Receiver<NtPayload>)>) -> Option<String> {
if members.is_empty() {
return None;
}
let futs: Vec<_> = members
.iter_mut()
.map(|(name, rx)| {
let name = name.clone();
Box::pin(async move { (name, rx.recv().await) })
as std::pin::Pin<Box<dyn Future<Output = (String, Option<NtPayload>)> + Send + '_>>
})
.collect();
let (field_name, payload) = race_all(futs).await;
if payload.is_none() {
members.retain(|(n, _)| n != &field_name);
if members.is_empty() {
return None;
}
return Box::pin(poll_any_member(members)).await;
}
Some(field_name)
}
async fn race_all<T>(futs: Vec<std::pin::Pin<Box<dyn Future<Output = T> + Send + '_>>>) -> T {
use std::pin::Pin;
use std::task::{Context, Poll};
assert!(!futs.is_empty());
struct RaceAll<'a, T> {
futs: Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>,
}
impl<T> Future for RaceAll<'_, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
for fut in &mut self.futs {
if let Poll::Ready(val) = fut.as_mut().poll(cx) {
return Poll::Ready(val);
}
}
Poll::Pending
}
}
RaceAll { futs }.await
}
fn payload_to_pv_value(payload: &NtPayload, mapping: FieldMapping) -> PvValue {
match mapping {
FieldMapping::Scalar => payload_to_full_structure(payload),
FieldMapping::Plain => payload_to_value_only(payload),
FieldMapping::Meta => payload_to_meta_only(payload),
FieldMapping::Any => payload_to_full_structure(payload),
FieldMapping::Proc => PvValue::Scalar(ScalarValue::I32(0)), }
}
fn payload_to_full_structure(payload: &NtPayload) -> PvValue {
match payload {
NtPayload::Scalar(nt) => {
let mut fields = vec![
("value".to_string(), PvValue::Scalar(nt.value.clone())),
(
"alarm".to_string(),
alarm_to_pv_value(nt.alarm_severity, nt.alarm_status, &nt.alarm_message),
),
("timeStamp".to_string(), timestamp_to_pv_value_default()),
];
fields.push((
"display".to_string(),
PvValue::Structure {
struct_id: "display_t".to_string(),
fields: vec![
(
"limitLow".to_string(),
PvValue::Scalar(ScalarValue::F64(nt.display_low)),
),
(
"limitHigh".to_string(),
PvValue::Scalar(ScalarValue::F64(nt.display_high)),
),
(
"description".to_string(),
PvValue::Scalar(ScalarValue::Str(nt.display_description.clone())),
),
(
"units".to_string(),
PvValue::Scalar(ScalarValue::Str(nt.units.clone())),
),
(
"precision".to_string(),
PvValue::Scalar(ScalarValue::I32(nt.display_precision)),
),
],
},
));
fields.push((
"control".to_string(),
PvValue::Structure {
struct_id: "control_t".to_string(),
fields: vec![
(
"limitLow".to_string(),
PvValue::Scalar(ScalarValue::F64(nt.control_low)),
),
(
"limitHigh".to_string(),
PvValue::Scalar(ScalarValue::F64(nt.control_high)),
),
(
"minStep".to_string(),
PvValue::Scalar(ScalarValue::F64(nt.control_min_step)),
),
],
},
));
PvValue::Structure {
struct_id: "epics:nt/NTScalar:1.0".to_string(),
fields,
}
}
NtPayload::ScalarArray(nt) => {
let fields = vec![
("value".to_string(), PvValue::ScalarArray(nt.value.clone())),
(
"alarm".to_string(),
alarm_to_pv_value(nt.alarm.severity, nt.alarm.status, &nt.alarm.message),
),
("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
];
PvValue::Structure {
struct_id: "epics:nt/NTScalarArray:1.0".to_string(),
fields,
}
}
NtPayload::Enum(nt) => {
let fields = vec![
(
"value".to_string(),
PvValue::Structure {
struct_id: "enum_t".to_string(),
fields: vec![
(
"index".to_string(),
PvValue::Scalar(ScalarValue::I32(nt.index)),
),
(
"choices".to_string(),
PvValue::ScalarArray(ScalarArrayValue::Str(nt.choices.clone())),
),
],
},
),
("alarm".to_string(), alarm_pv(&nt.alarm)),
("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
];
PvValue::Structure {
struct_id: "epics:nt/NTEnum:1.0".to_string(),
fields,
}
}
NtPayload::Generic { struct_id, fields } => PvValue::Structure {
struct_id: struct_id.clone(),
fields: fields.clone(),
},
_ => PvValue::Scalar(ScalarValue::I32(0)),
}
}
fn payload_to_value_only(payload: &NtPayload) -> PvValue {
match payload {
NtPayload::Scalar(nt) => PvValue::Scalar(nt.value.clone()),
NtPayload::ScalarArray(nt) => PvValue::ScalarArray(nt.value.clone()),
NtPayload::Enum(nt) => PvValue::Scalar(ScalarValue::I32(nt.index)),
NtPayload::Generic { fields, .. } => {
fields
.iter()
.find(|(n, _)| n == "value")
.map(|(_, v)| v.clone())
.unwrap_or(PvValue::Scalar(ScalarValue::I32(0)))
}
_ => PvValue::Scalar(ScalarValue::I32(0)),
}
}
fn payload_to_meta_only(payload: &NtPayload) -> PvValue {
match payload {
NtPayload::Scalar(nt) => PvValue::Structure {
struct_id: String::new(),
fields: vec![
(
"alarm".to_string(),
alarm_to_pv_value(nt.alarm_severity, nt.alarm_status, &nt.alarm_message),
),
("timeStamp".to_string(), timestamp_to_pv_value_default()),
],
},
NtPayload::ScalarArray(nt) => PvValue::Structure {
struct_id: String::new(),
fields: vec![
("alarm".to_string(), alarm_pv(&nt.alarm)),
("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
],
},
NtPayload::Enum(nt) => PvValue::Structure {
struct_id: String::new(),
fields: vec![
("alarm".to_string(), alarm_pv(&nt.alarm)),
("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
],
},
_ => PvValue::Structure {
struct_id: String::new(),
fields: vec![],
},
}
}
fn payload_field_type(payload: &NtPayload, mapping: FieldMapping) -> FieldType {
match mapping {
FieldMapping::Scalar | FieldMapping::Any => {
FieldType::Structure(descriptor_for_payload(payload))
}
FieldMapping::Plain => match payload {
NtPayload::Scalar(nt) => value_field_type(&nt.value),
NtPayload::ScalarArray(nt) => array_field_type(&nt.value),
NtPayload::Enum(_) => FieldType::Scalar(TypeCode::Int32),
_ => FieldType::Scalar(TypeCode::Int32),
},
FieldMapping::Meta => FieldType::Structure(StructureDesc {
struct_id: None,
fields: vec![
FieldDesc {
name: "alarm".to_string(),
field_type: FieldType::Structure(alarm_struct_desc()),
},
FieldDesc {
name: "timeStamp".to_string(),
field_type: FieldType::Structure(timestamp_struct_desc()),
},
],
}),
FieldMapping::Proc => FieldType::Scalar(TypeCode::Int32),
}
}
fn value_field_type(sv: &ScalarValue) -> FieldType {
match sv {
ScalarValue::Str(_) => FieldType::String,
sv => {
let tc = match sv {
ScalarValue::Bool(_) => TypeCode::Boolean,
ScalarValue::I8(_) => TypeCode::Int8,
ScalarValue::I16(_) => TypeCode::Int16,
ScalarValue::I32(_) => TypeCode::Int32,
ScalarValue::I64(_) => TypeCode::Int64,
ScalarValue::U8(_) => TypeCode::UInt8,
ScalarValue::U16(_) => TypeCode::UInt16,
ScalarValue::U32(_) => TypeCode::UInt32,
ScalarValue::U64(_) => TypeCode::UInt64,
ScalarValue::F32(_) => TypeCode::Float32,
ScalarValue::F64(_) => TypeCode::Float64,
ScalarValue::Str(_) => unreachable!(),
};
FieldType::Scalar(tc)
}
}
}
fn array_field_type(sav: &ScalarArrayValue) -> FieldType {
match sav {
ScalarArrayValue::Str(_) => FieldType::StringArray,
sav => {
let tc = match sav {
ScalarArrayValue::Bool(_) => TypeCode::Boolean,
ScalarArrayValue::I8(_) => TypeCode::Int8,
ScalarArrayValue::I16(_) => TypeCode::Int16,
ScalarArrayValue::I32(_) => TypeCode::Int32,
ScalarArrayValue::I64(_) => TypeCode::Int64,
ScalarArrayValue::U8(_) => TypeCode::UInt8,
ScalarArrayValue::U16(_) => TypeCode::UInt16,
ScalarArrayValue::U32(_) => TypeCode::UInt32,
ScalarArrayValue::U64(_) => TypeCode::UInt64,
ScalarArrayValue::F32(_) => TypeCode::Float32,
ScalarArrayValue::F64(_) => TypeCode::Float64,
ScalarArrayValue::Str(_) => unreachable!(),
};
FieldType::ScalarArray(tc)
}
}
}
fn alarm_struct_desc() -> StructureDesc {
StructureDesc {
struct_id: Some("alarm_t".to_string()),
fields: vec![
FieldDesc {
name: "severity".to_string(),
field_type: FieldType::Scalar(TypeCode::Int32),
},
FieldDesc {
name: "status".to_string(),
field_type: FieldType::Scalar(TypeCode::Int32),
},
FieldDesc {
name: "message".to_string(),
field_type: FieldType::String,
},
],
}
}
fn timestamp_struct_desc() -> StructureDesc {
StructureDesc {
struct_id: Some("time_t".to_string()),
fields: vec![
FieldDesc {
name: "secondsPastEpoch".to_string(),
field_type: FieldType::Scalar(TypeCode::Int64),
},
FieldDesc {
name: "nanoseconds".to_string(),
field_type: FieldType::Scalar(TypeCode::Int32),
},
FieldDesc {
name: "userTag".to_string(),
field_type: FieldType::Scalar(TypeCode::Int32),
},
],
}
}
fn alarm_to_pv_value(severity: i32, status: i32, message: &str) -> PvValue {
PvValue::Structure {
struct_id: "alarm_t".to_string(),
fields: vec![
(
"severity".to_string(),
PvValue::Scalar(ScalarValue::I32(severity)),
),
(
"status".to_string(),
PvValue::Scalar(ScalarValue::I32(status)),
),
(
"message".to_string(),
PvValue::Scalar(ScalarValue::Str(message.to_string())),
),
],
}
}
fn alarm_pv(alarm: &spvirit_types::NtAlarm) -> PvValue {
alarm_to_pv_value(alarm.severity, alarm.status, &alarm.message)
}
fn timestamp_pv(ts: &spvirit_types::NtTimeStamp) -> PvValue {
PvValue::Structure {
struct_id: "time_t".to_string(),
fields: vec![
(
"secondsPastEpoch".to_string(),
PvValue::Scalar(ScalarValue::I64(ts.seconds_past_epoch)),
),
(
"nanoseconds".to_string(),
PvValue::Scalar(ScalarValue::I32(ts.nanoseconds)),
),
(
"userTag".to_string(),
PvValue::Scalar(ScalarValue::I32(ts.user_tag)),
),
],
}
}
fn timestamp_to_pv_value_default() -> PvValue {
timestamp_pv(&spvirit_types::NtTimeStamp::default())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_basic_group() {
let json = r#"{
"GRP:test": {
"+id": "epics:nt/NTTable:1.0",
"+atomic": true,
"fieldA": {
"+channel": "REC:A",
"+type": "scalar",
"+trigger": "*"
},
"fieldB": {
"+channel": "REC:B",
"+type": "plain"
}
}
}"#;
let groups = parse_group_config(json).unwrap();
assert_eq!(groups.len(), 1);
let g = &groups[0];
assert_eq!(g.name, "GRP:test");
assert_eq!(g.struct_id.as_deref(), Some("epics:nt/NTTable:1.0"));
assert!(g.atomic);
assert_eq!(g.members.len(), 2);
let a = g.members.iter().find(|m| m.field_name == "fieldA").unwrap();
assert_eq!(a.channel, "REC:A");
assert_eq!(a.mapping, FieldMapping::Scalar);
assert_eq!(a.triggers, TriggerDef::All);
let b = g.members.iter().find(|m| m.field_name == "fieldB").unwrap();
assert_eq!(b.channel, "REC:B");
assert_eq!(b.mapping, FieldMapping::Plain);
}
#[test]
fn parse_minimal_member() {
let json = r#"{
"GRP:min": {
"x": { "+channel": "R:x" }
}
}"#;
let groups = parse_group_config(json).unwrap();
let m = &groups[0].members[0];
assert_eq!(m.mapping, FieldMapping::Plain); assert_eq!(m.triggers, TriggerDef::None); assert_eq!(m.put_order, 0);
}
#[test]
fn parse_proc_mapping() {
let json = r#"{
"GRP:proc": {
"go": {
"+channel": "REC:PROC",
"+type": "proc",
"+trigger": "go",
"+putorder": 99
}
}
}"#;
let groups = parse_group_config(json).unwrap();
let m = &groups[0].members[0];
assert_eq!(m.mapping, FieldMapping::Proc);
assert_eq!(m.put_order, 99);
assert_eq!(m.triggers, TriggerDef::Fields(vec!["go".into()]));
}
#[test]
fn parse_error_missing_channel() {
let json = r#"{
"GRP:bad": {
"x": { "+type": "scalar" }
}
}"#;
assert!(parse_group_config(json).is_err());
}
#[test]
fn parse_multiple_groups() {
let json = r#"{
"G:a": { "x": { "+channel": "R:x" } },
"G:b": { "y": { "+channel": "R:y" } }
}"#;
let groups = parse_group_config(json).unwrap();
assert_eq!(groups.len(), 2);
}
#[test]
fn parse_member_id() {
let json = r#"{
"GRP:id": {
"val": {
"+channel": "R:val",
"+id": "custom_t"
}
}
}"#;
let groups = parse_group_config(json).unwrap();
assert_eq!(groups[0].members[0].struct_id.as_deref(), Some("custom_t"));
}
#[test]
fn parse_member_no_id() {
let json = r#"{
"GRP:noid": {
"v": { "+channel": "R:v" }
}
}"#;
let groups = parse_group_config(json).unwrap();
assert!(groups[0].members[0].struct_id.is_none());
}
#[test]
fn parse_info_group_prefix() {
let json = r#"{
"TEMP:group": {
"VAL": {
"+channel": "VAL",
"+type": "plain",
"+trigger": "*"
}
}
}"#;
let groups = parse_info_group("TEMP:sensor", json).unwrap();
assert_eq!(groups[0].members[0].channel, "TEMP:sensor.VAL");
}
#[test]
fn parse_info_group_absolute_channel() {
let json = r#"{
"TEMP:group": {
"pressure": {
"+channel": "PRESS:ai",
"+type": "scalar"
}
}
}"#;
let groups = parse_info_group("TEMP:sensor", json).unwrap();
assert_eq!(groups[0].members[0].channel, "PRESS:ai");
}
#[test]
fn merge_groups() {
let mut existing = HashMap::new();
let defs1 = parse_group_config(r#"{ "GRP:a": { "x": { "+channel": "R1:x" } } }"#).unwrap();
merge_group_defs(&mut existing, defs1);
let defs2 = parse_group_config(r#"{ "GRP:a": { "y": { "+channel": "R2:y" } } }"#).unwrap();
merge_group_defs(&mut existing, defs2);
let grp = existing.get("GRP:a").unwrap();
assert_eq!(grp.members.len(), 2);
}
#[test]
fn trigger_validation_unknown_field() {
let json = r#"{
"GRP:bad": {
"x": {
"+channel": "R:x",
"+trigger": "y,z"
},
"y": { "+channel": "R:y" }
}
}"#;
let result = parse_group_config(json);
assert!(result.is_err());
let e = format!("{}", result.unwrap_err());
assert!(e.contains("'z'"), "expected error about 'z': {e}");
}
#[test]
fn trigger_validation_self_reference() {
let json = r#"{
"GRP:ok": {
"a": { "+channel": "R:a", "+trigger": "a,b" },
"b": { "+channel": "R:b", "+trigger": "a" }
}
}"#;
assert!(parse_group_config(json).is_ok());
}
#[test]
fn trigger_validation_star_passes() {
let json = r#"{
"GRP:ok": {
"a": { "+channel": "R:a", "+trigger": "*" }
}
}"#;
assert!(parse_group_config(json).is_ok());
}
}