1use std::collections::HashMap;
27use std::fmt;
28
29use serde::Deserialize;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum FieldMapping {
38 Scalar,
40 Plain,
42 Meta,
44 Any,
46 Proc,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum TriggerDef {
53 All,
55 Fields(Vec<String>),
57 None,
59}
60
61#[derive(Debug, Clone)]
63pub struct GroupMember {
64 pub field_name: String,
66 pub channel: String,
68 pub mapping: FieldMapping,
70 pub triggers: TriggerDef,
72 pub put_order: i32,
74 pub struct_id: Option<String>,
76}
77
78#[derive(Debug, Clone)]
80pub struct GroupPvDef {
81 pub name: String,
83 pub struct_id: Option<String>,
85 pub atomic: bool,
87 pub members: Vec<GroupMember>,
89}
90
91#[derive(Debug)]
96pub struct GroupConfigError(String);
97
98impl fmt::Display for GroupConfigError {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 write!(f, "group config: {}", self.0)
101 }
102}
103
104impl std::error::Error for GroupConfigError {}
105
106type Result<T> = std::result::Result<T, GroupConfigError>;
107
108fn err(msg: impl Into<String>) -> GroupConfigError {
109 GroupConfigError(msg.into())
110}
111
112type RawConfig = HashMap<String, RawGroupDef>;
118
119#[derive(Deserialize)]
121struct RawGroupDef {
122 #[serde(rename = "+id", default)]
123 id: Option<String>,
124 #[serde(rename = "+atomic", default)]
125 atomic: Option<bool>,
126 #[serde(flatten)]
127 members: HashMap<String, RawMember>,
128}
129
130#[derive(Deserialize)]
131struct RawMember {
132 #[serde(rename = "+channel", default)]
133 channel: Option<String>,
134 #[serde(rename = "+type", default)]
135 mapping: Option<String>,
136 #[serde(rename = "+trigger", default)]
137 trigger: Option<String>,
138 #[serde(rename = "+putorder", default)]
139 putorder: Option<i32>,
140 #[serde(rename = "+id", default)]
141 id: Option<String>,
142}
143
144pub fn parse_group_config(json: &str) -> Result<Vec<GroupPvDef>> {
152 let raw: RawConfig =
153 serde_json::from_str(json).map_err(|e| err(format!("invalid JSON: {e}")))?;
154
155 let mut groups: Vec<GroupPvDef> = Vec::with_capacity(raw.len());
156 for (name, raw_group) in raw {
157 groups.push(raw_to_group_def(name, raw_group)?);
158 }
159 groups.sort_by(|a, b| a.name.cmp(&b.name));
161 Ok(groups)
162}
163
164pub fn parse_info_group(record_name: &str, json: &str) -> Result<Vec<GroupPvDef>> {
168 let raw: RawConfig =
169 serde_json::from_str(json).map_err(|e| err(format!("invalid JSON: {e}")))?;
170
171 let mut groups: Vec<GroupPvDef> = Vec::with_capacity(raw.len());
172 for (name, mut raw_group) in raw {
173 for member in raw_group.members.values_mut() {
175 if let Some(ref mut ch) = member.channel {
176 if !ch.contains(':') {
177 *ch = format!("{record_name}.{ch}");
178 }
179 }
180 }
181 groups.push(raw_to_group_def(name, raw_group)?);
182 }
183 groups.sort_by(|a, b| a.name.cmp(&b.name));
184 Ok(groups)
185}
186
187pub fn merge_group_defs(existing: &mut HashMap<String, GroupPvDef>, new_defs: Vec<GroupPvDef>) {
192 for def in new_defs {
193 existing
194 .entry(def.name.clone())
195 .and_modify(|e| {
196 if def.struct_id.is_some() {
198 e.struct_id.clone_from(&def.struct_id);
199 }
200 e.atomic |= def.atomic;
201 e.members.extend(def.members.iter().cloned());
202 })
203 .or_insert(def);
204 }
205}
206
207fn raw_to_group_def(name: String, raw: RawGroupDef) -> Result<GroupPvDef> {
212 let mut members = Vec::with_capacity(raw.members.len());
213
214 let field_names: Vec<&str> = raw.members.keys().map(|s| s.as_str()).collect();
216
217 for (field_name, raw_member) in &raw.members {
218 members.push(parse_member(field_name, raw_member, &field_names)?);
219 }
220
221 members.sort_by(|a, b| a.field_name.cmp(&b.field_name));
223
224 Ok(GroupPvDef {
225 name,
226 struct_id: raw.id,
227 atomic: raw.atomic.unwrap_or(false),
228 members,
229 })
230}
231
232fn parse_member(field_name: &str, raw: &RawMember, all_fields: &[&str]) -> Result<GroupMember> {
233 let channel = raw
234 .channel
235 .clone()
236 .ok_or_else(|| err(format!("member '{field_name}' missing +channel")))?;
237
238 let mapping = match raw.mapping.as_deref() {
239 None | Some("plain") => FieldMapping::Plain,
240 Some("scalar") => FieldMapping::Scalar,
241 Some("meta") => FieldMapping::Meta,
242 Some("any") => FieldMapping::Any,
243 Some("proc") => FieldMapping::Proc,
244 Some(other) => {
245 return Err(err(format!(
246 "member '{field_name}': unknown +type '{other}'"
247 )));
248 }
249 };
250
251 let triggers = match raw.trigger.as_deref() {
252 None => TriggerDef::None,
253 Some("*") => TriggerDef::All,
254 Some("") => TriggerDef::None,
255 Some(spec) => {
256 let names: Vec<String> = spec.split(',').map(|s| s.trim().to_owned()).collect();
257 for n in &names {
259 if !all_fields.contains(&n.as_str()) {
260 return Err(err(format!(
261 "member '{field_name}': trigger references unknown field '{n}'"
262 )));
263 }
264 }
265 TriggerDef::Fields(names)
266 }
267 };
268
269 Ok(GroupMember {
270 field_name: field_name.to_owned(),
271 channel,
272 mapping,
273 triggers,
274 put_order: raw.putorder.unwrap_or(0),
275 struct_id: raw.id.clone(),
276 })
277}
278
279use std::sync::Arc;
284
285use tokio::sync::mpsc;
286
287use spvirit_codec::spvd_decode::{DecodedValue, FieldDesc, FieldType, StructureDesc, TypeCode};
288use spvirit_types::{NtPayload, PvValue, ScalarArrayValue, ScalarValue};
289
290use crate::pvstore::PvStore;
291use crate::simple_store::descriptor_for_payload;
292
293pub struct GroupPvStore<S: PvStore> {
299 inner: Arc<S>,
300 groups: HashMap<String, GroupPvDef>,
301}
302
303impl<S: PvStore> GroupPvStore<S> {
304 pub fn new(inner: Arc<S>, groups: HashMap<String, GroupPvDef>) -> Self {
306 Self { inner, groups }
307 }
308
309 async fn group_snapshot(&self, def: &GroupPvDef) -> NtPayload {
311 let mut fields = Vec::with_capacity(def.members.len());
312 for member in &def.members {
313 if member.mapping == FieldMapping::Proc {
314 continue; }
316 let pv_val = match self.inner.get_snapshot(&member.channel).await {
317 Some(snap) => payload_to_pv_value(&snap, member.mapping),
318 None => PvValue::Scalar(ScalarValue::I32(0)), };
320 fields.push((member.field_name.clone(), pv_val));
321 }
322 NtPayload::Generic {
323 struct_id: def
324 .struct_id
325 .clone()
326 .unwrap_or_else(|| "structure".to_string()),
327 fields,
328 }
329 }
330
331 async fn group_descriptor(&self, def: &GroupPvDef) -> StructureDesc {
333 let mut field_descs = Vec::with_capacity(def.members.len());
334 for member in &def.members {
335 if member.mapping == FieldMapping::Proc {
336 continue;
337 }
338 let field_type = match self.inner.get_snapshot(&member.channel).await {
339 Some(snap) => payload_field_type(&snap, member.mapping),
340 None => FieldType::Scalar(TypeCode::Int32), };
342 field_descs.push(FieldDesc {
343 name: member.field_name.clone(),
344 field_type,
345 });
346 }
347 StructureDesc {
348 struct_id: def.struct_id.clone(),
349 fields: field_descs,
350 }
351 }
352}
353
354impl<S: PvStore> PvStore for GroupPvStore<S> {
355 fn has_pv(&self, name: &str) -> impl Future<Output = bool> + Send {
356 async move {
357 if self.groups.contains_key(name) {
358 return true;
359 }
360 self.inner.has_pv(name).await
361 }
362 }
363
364 fn get_snapshot(&self, name: &str) -> impl Future<Output = Option<NtPayload>> + Send {
365 async move {
366 if let Some(def) = self.groups.get(name) {
367 return Some(self.group_snapshot(def).await);
368 }
369 self.inner.get_snapshot(name).await
370 }
371 }
372
373 fn get_descriptor(&self, name: &str) -> impl Future<Output = Option<StructureDesc>> + Send {
374 async move {
375 if let Some(def) = self.groups.get(name) {
376 return Some(self.group_descriptor(def).await);
377 }
378 self.inner.get_descriptor(name).await
379 }
380 }
381
382 fn put_value(
383 &self,
384 name: &str,
385 value: &DecodedValue,
386 ) -> impl Future<Output = std::result::Result<Vec<(String, NtPayload)>, String>> + Send {
387 let name = name.to_string();
388 let value = value.clone();
389 async move {
390 if let Some(def) = self.groups.get(&name) {
391 let fields = match &value {
393 DecodedValue::Structure(f) => f,
394 _ => return Err("group PUT requires a structure".to_string()),
395 };
396
397 let mut ordered: Vec<&GroupMember> = def.members.iter().collect();
399 ordered.sort_by_key(|m| m.put_order);
400
401 let mut results = Vec::new();
402 for member in ordered {
403 if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Meta
404 {
405 continue;
406 }
407 if let Some((_, sub_val)) = fields.iter().find(|(n, _)| n == &member.field_name)
409 {
410 match self.inner.put_value(&member.channel, sub_val).await {
411 Ok(mut r) => results.append(&mut r),
412 Err(e) => {
413 tracing::warn!(
414 "group PUT {}: member {} failed: {e}",
415 name,
416 member.field_name
417 );
418 }
419 }
420 }
421 }
422 Ok(results)
423 } else {
424 self.inner.put_value(&name, &value).await
425 }
426 }
427 }
428
429 fn is_writable(&self, name: &str) -> impl Future<Output = bool> + Send {
430 async move {
431 if let Some(def) = self.groups.get(name) {
432 for member in &def.members {
434 if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Meta
435 {
436 continue;
437 }
438 if self.inner.is_writable(&member.channel).await {
439 return true;
440 }
441 }
442 return false;
443 }
444 self.inner.is_writable(name).await
445 }
446 }
447
448 fn list_pvs(&self) -> impl Future<Output = Vec<String>> + Send {
449 async move {
450 let mut pvs = self.inner.list_pvs().await;
451 pvs.extend(self.groups.keys().cloned());
452 pvs.sort();
453 pvs.dedup();
454 pvs
455 }
456 }
457
458 fn subscribe(
459 &self,
460 name: &str,
461 ) -> impl Future<Output = Option<mpsc::Receiver<NtPayload>>> + Send {
462 let name = name.to_string();
463 async move {
464 if let Some(def) = self.groups.get(&name) {
465 return self.subscribe_group(def).await;
466 }
467 self.inner.subscribe(&name).await
468 }
469 }
470}
471
472impl<S: PvStore> GroupPvStore<S> {
473 async fn subscribe_group(&self, def: &GroupPvDef) -> Option<mpsc::Receiver<NtPayload>> {
478 let (tx, rx) = mpsc::channel(64);
479 let inner = self.inner.clone();
480 let def = def.clone();
481
482 let mut member_rxs: Vec<(String, mpsc::Receiver<NtPayload>)> = Vec::new();
484 for member in &def.members {
485 if let Some(member_rx) = inner.subscribe(&member.channel).await {
486 member_rxs.push((member.field_name.clone(), member_rx));
487 }
488 }
489 if member_rxs.is_empty() {
490 return None;
491 }
492
493 let trigger_map = build_trigger_map(&def);
495
496 tokio::spawn(async move {
497 loop {
501 let src_field = match poll_any_member(&mut member_rxs).await {
503 Some(field_name) => field_name,
504 None => break, };
506
507 {
508 let should_send = match trigger_map.get(&src_field) {
510 Some(targets) => !targets.is_empty(),
511 None => false,
512 };
513
514 if should_send {
515 let mut fields = Vec::with_capacity(def.members.len());
517 for member in &def.members {
518 if member.mapping == FieldMapping::Proc {
519 continue;
520 }
521 let pv_val = match inner.get_snapshot(&member.channel).await {
522 Some(snap) => payload_to_pv_value(&snap, member.mapping),
523 None => PvValue::Scalar(ScalarValue::I32(0)),
524 };
525 fields.push((member.field_name.clone(), pv_val));
526 }
527 let payload = NtPayload::Generic {
528 struct_id: def
529 .struct_id
530 .clone()
531 .unwrap_or_else(|| "structure".to_string()),
532 fields,
533 };
534 if tx.send(payload).await.is_err() {
535 break; }
537 }
538 }
539 }
540 });
541
542 Some(rx)
543 }
544}
545
546fn build_trigger_map(def: &GroupPvDef) -> HashMap<String, Vec<String>> {
548 let all_fields: Vec<String> = def.members.iter().map(|m| m.field_name.clone()).collect();
549 let mut map: HashMap<String, Vec<String>> = HashMap::new();
550
551 for member in &def.members {
552 let targets = match &member.triggers {
553 TriggerDef::All => all_fields.clone(),
554 TriggerDef::Fields(names) => names.clone(),
555 TriggerDef::None => Vec::new(),
556 };
557 map.insert(member.field_name.clone(), targets);
558 }
559 map
560}
561
562async fn poll_any_member(members: &mut Vec<(String, mpsc::Receiver<NtPayload>)>) -> Option<String> {
565 if members.is_empty() {
566 return None;
567 }
568
569 let futs: Vec<_> = members
571 .iter_mut()
572 .map(|(name, rx)| {
573 let name = name.clone();
574 Box::pin(async move { (name, rx.recv().await) })
575 as std::pin::Pin<Box<dyn Future<Output = (String, Option<NtPayload>)> + Send + '_>>
576 })
577 .collect();
578
579 let (field_name, payload) = race_all(futs).await;
580
581 if payload.is_none() {
582 members.retain(|(n, _)| n != &field_name);
584 if members.is_empty() {
585 return None;
586 }
587 return Box::pin(poll_any_member(members)).await;
588 }
589
590 Some(field_name)
591}
592
593async fn race_all<T>(futs: Vec<std::pin::Pin<Box<dyn Future<Output = T> + Send + '_>>>) -> T {
595 use std::pin::Pin;
596 use std::task::{Context, Poll};
597
598 assert!(!futs.is_empty());
599
600 struct RaceAll<'a, T> {
601 futs: Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>,
602 }
603
604 impl<T> Future for RaceAll<'_, T> {
605 type Output = T;
606 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
607 for fut in &mut self.futs {
608 if let Poll::Ready(val) = fut.as_mut().poll(cx) {
609 return Poll::Ready(val);
610 }
611 }
612 Poll::Pending
613 }
614 }
615
616 RaceAll { futs }.await
617}
618
619fn payload_to_pv_value(payload: &NtPayload, mapping: FieldMapping) -> PvValue {
626 match mapping {
627 FieldMapping::Scalar => payload_to_full_structure(payload),
628 FieldMapping::Plain => payload_to_value_only(payload),
629 FieldMapping::Meta => payload_to_meta_only(payload),
630 FieldMapping::Any => payload_to_full_structure(payload),
631 FieldMapping::Proc => PvValue::Scalar(ScalarValue::I32(0)), }
633}
634
635fn payload_to_full_structure(payload: &NtPayload) -> PvValue {
637 match payload {
638 NtPayload::Scalar(nt) => {
639 let mut fields = vec![
640 ("value".to_string(), PvValue::Scalar(nt.value.clone())),
641 (
642 "alarm".to_string(),
643 alarm_to_pv_value(nt.alarm_severity, nt.alarm_status, &nt.alarm_message),
644 ),
645 ("timeStamp".to_string(), timestamp_to_pv_value_default()),
646 ];
647 fields.push((
648 "display".to_string(),
649 PvValue::Structure {
650 struct_id: "display_t".to_string(),
651 fields: vec![
652 (
653 "limitLow".to_string(),
654 PvValue::Scalar(ScalarValue::F64(nt.display_low)),
655 ),
656 (
657 "limitHigh".to_string(),
658 PvValue::Scalar(ScalarValue::F64(nt.display_high)),
659 ),
660 (
661 "description".to_string(),
662 PvValue::Scalar(ScalarValue::Str(nt.display_description.clone())),
663 ),
664 (
665 "units".to_string(),
666 PvValue::Scalar(ScalarValue::Str(nt.units.clone())),
667 ),
668 (
669 "precision".to_string(),
670 PvValue::Scalar(ScalarValue::I32(nt.display_precision)),
671 ),
672 ],
673 },
674 ));
675 fields.push((
676 "control".to_string(),
677 PvValue::Structure {
678 struct_id: "control_t".to_string(),
679 fields: vec![
680 (
681 "limitLow".to_string(),
682 PvValue::Scalar(ScalarValue::F64(nt.control_low)),
683 ),
684 (
685 "limitHigh".to_string(),
686 PvValue::Scalar(ScalarValue::F64(nt.control_high)),
687 ),
688 (
689 "minStep".to_string(),
690 PvValue::Scalar(ScalarValue::F64(nt.control_min_step)),
691 ),
692 ],
693 },
694 ));
695 PvValue::Structure {
696 struct_id: "epics:nt/NTScalar:1.0".to_string(),
697 fields,
698 }
699 }
700 NtPayload::ScalarArray(nt) => {
701 let fields = vec![
702 ("value".to_string(), PvValue::ScalarArray(nt.value.clone())),
703 (
704 "alarm".to_string(),
705 alarm_to_pv_value(nt.alarm.severity, nt.alarm.status, &nt.alarm.message),
706 ),
707 ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
708 ];
709 PvValue::Structure {
710 struct_id: "epics:nt/NTScalarArray:1.0".to_string(),
711 fields,
712 }
713 }
714 NtPayload::Enum(nt) => {
715 let fields = vec![
716 (
717 "value".to_string(),
718 PvValue::Structure {
719 struct_id: "enum_t".to_string(),
720 fields: vec![
721 (
722 "index".to_string(),
723 PvValue::Scalar(ScalarValue::I32(nt.index)),
724 ),
725 (
726 "choices".to_string(),
727 PvValue::ScalarArray(ScalarArrayValue::Str(nt.choices.clone())),
728 ),
729 ],
730 },
731 ),
732 ("alarm".to_string(), alarm_pv(&nt.alarm)),
733 ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
734 ];
735 PvValue::Structure {
736 struct_id: "epics:nt/NTEnum:1.0".to_string(),
737 fields,
738 }
739 }
740 NtPayload::Generic { struct_id, fields } => PvValue::Structure {
741 struct_id: struct_id.clone(),
742 fields: fields.clone(),
743 },
744 _ => PvValue::Scalar(ScalarValue::I32(0)),
745 }
746}
747
748fn payload_to_value_only(payload: &NtPayload) -> PvValue {
750 match payload {
751 NtPayload::Scalar(nt) => PvValue::Scalar(nt.value.clone()),
752 NtPayload::ScalarArray(nt) => PvValue::ScalarArray(nt.value.clone()),
753 NtPayload::Enum(nt) => PvValue::Scalar(ScalarValue::I32(nt.index)),
754 NtPayload::Generic { fields, .. } => {
755 fields
757 .iter()
758 .find(|(n, _)| n == "value")
759 .map(|(_, v)| v.clone())
760 .unwrap_or(PvValue::Scalar(ScalarValue::I32(0)))
761 }
762 _ => PvValue::Scalar(ScalarValue::I32(0)),
763 }
764}
765
766fn payload_to_meta_only(payload: &NtPayload) -> PvValue {
768 match payload {
769 NtPayload::Scalar(nt) => PvValue::Structure {
770 struct_id: String::new(),
771 fields: vec![
772 (
773 "alarm".to_string(),
774 alarm_to_pv_value(nt.alarm_severity, nt.alarm_status, &nt.alarm_message),
775 ),
776 ("timeStamp".to_string(), timestamp_to_pv_value_default()),
777 ],
778 },
779 NtPayload::ScalarArray(nt) => PvValue::Structure {
780 struct_id: String::new(),
781 fields: vec![
782 ("alarm".to_string(), alarm_pv(&nt.alarm)),
783 ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
784 ],
785 },
786 NtPayload::Enum(nt) => PvValue::Structure {
787 struct_id: String::new(),
788 fields: vec![
789 ("alarm".to_string(), alarm_pv(&nt.alarm)),
790 ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
791 ],
792 },
793 _ => PvValue::Structure {
794 struct_id: String::new(),
795 fields: vec![],
796 },
797 }
798}
799
800fn payload_field_type(payload: &NtPayload, mapping: FieldMapping) -> FieldType {
802 match mapping {
803 FieldMapping::Scalar | FieldMapping::Any => {
804 FieldType::Structure(descriptor_for_payload(payload))
805 }
806 FieldMapping::Plain => match payload {
807 NtPayload::Scalar(nt) => value_field_type(&nt.value),
808 NtPayload::ScalarArray(nt) => array_field_type(&nt.value),
809 NtPayload::Enum(_) => FieldType::Scalar(TypeCode::Int32),
810 _ => FieldType::Scalar(TypeCode::Int32),
811 },
812 FieldMapping::Meta => FieldType::Structure(StructureDesc {
813 struct_id: None,
814 fields: vec![
815 FieldDesc {
816 name: "alarm".to_string(),
817 field_type: FieldType::Structure(alarm_struct_desc()),
818 },
819 FieldDesc {
820 name: "timeStamp".to_string(),
821 field_type: FieldType::Structure(timestamp_struct_desc()),
822 },
823 ],
824 }),
825 FieldMapping::Proc => FieldType::Scalar(TypeCode::Int32),
826 }
827}
828
829fn value_field_type(sv: &ScalarValue) -> FieldType {
830 match sv {
831 ScalarValue::Str(_) => FieldType::String,
832 sv => {
833 let tc = match sv {
834 ScalarValue::Bool(_) => TypeCode::Boolean,
835 ScalarValue::I8(_) => TypeCode::Int8,
836 ScalarValue::I16(_) => TypeCode::Int16,
837 ScalarValue::I32(_) => TypeCode::Int32,
838 ScalarValue::I64(_) => TypeCode::Int64,
839 ScalarValue::U8(_) => TypeCode::UInt8,
840 ScalarValue::U16(_) => TypeCode::UInt16,
841 ScalarValue::U32(_) => TypeCode::UInt32,
842 ScalarValue::U64(_) => TypeCode::UInt64,
843 ScalarValue::F32(_) => TypeCode::Float32,
844 ScalarValue::F64(_) => TypeCode::Float64,
845 ScalarValue::Str(_) => unreachable!(),
846 };
847 FieldType::Scalar(tc)
848 }
849 }
850}
851
852fn array_field_type(sav: &ScalarArrayValue) -> FieldType {
853 match sav {
854 ScalarArrayValue::Str(_) => FieldType::StringArray,
855 sav => {
856 let tc = match sav {
857 ScalarArrayValue::Bool(_) => TypeCode::Boolean,
858 ScalarArrayValue::I8(_) => TypeCode::Int8,
859 ScalarArrayValue::I16(_) => TypeCode::Int16,
860 ScalarArrayValue::I32(_) => TypeCode::Int32,
861 ScalarArrayValue::I64(_) => TypeCode::Int64,
862 ScalarArrayValue::U8(_) => TypeCode::UInt8,
863 ScalarArrayValue::U16(_) => TypeCode::UInt16,
864 ScalarArrayValue::U32(_) => TypeCode::UInt32,
865 ScalarArrayValue::U64(_) => TypeCode::UInt64,
866 ScalarArrayValue::F32(_) => TypeCode::Float32,
867 ScalarArrayValue::F64(_) => TypeCode::Float64,
868 ScalarArrayValue::Str(_) => unreachable!(),
869 };
870 FieldType::ScalarArray(tc)
871 }
872 }
873}
874
875fn alarm_struct_desc() -> StructureDesc {
876 StructureDesc {
877 struct_id: Some("alarm_t".to_string()),
878 fields: vec![
879 FieldDesc {
880 name: "severity".to_string(),
881 field_type: FieldType::Scalar(TypeCode::Int32),
882 },
883 FieldDesc {
884 name: "status".to_string(),
885 field_type: FieldType::Scalar(TypeCode::Int32),
886 },
887 FieldDesc {
888 name: "message".to_string(),
889 field_type: FieldType::String,
890 },
891 ],
892 }
893}
894
895fn timestamp_struct_desc() -> StructureDesc {
896 StructureDesc {
897 struct_id: Some("time_t".to_string()),
898 fields: vec![
899 FieldDesc {
900 name: "secondsPastEpoch".to_string(),
901 field_type: FieldType::Scalar(TypeCode::Int64),
902 },
903 FieldDesc {
904 name: "nanoseconds".to_string(),
905 field_type: FieldType::Scalar(TypeCode::Int32),
906 },
907 FieldDesc {
908 name: "userTag".to_string(),
909 field_type: FieldType::Scalar(TypeCode::Int32),
910 },
911 ],
912 }
913}
914
915fn alarm_to_pv_value(severity: i32, status: i32, message: &str) -> PvValue {
916 PvValue::Structure {
917 struct_id: "alarm_t".to_string(),
918 fields: vec![
919 (
920 "severity".to_string(),
921 PvValue::Scalar(ScalarValue::I32(severity)),
922 ),
923 (
924 "status".to_string(),
925 PvValue::Scalar(ScalarValue::I32(status)),
926 ),
927 (
928 "message".to_string(),
929 PvValue::Scalar(ScalarValue::Str(message.to_string())),
930 ),
931 ],
932 }
933}
934
935fn alarm_pv(alarm: &spvirit_types::NtAlarm) -> PvValue {
936 alarm_to_pv_value(alarm.severity, alarm.status, &alarm.message)
937}
938
939fn timestamp_pv(ts: &spvirit_types::NtTimeStamp) -> PvValue {
940 PvValue::Structure {
941 struct_id: "time_t".to_string(),
942 fields: vec![
943 (
944 "secondsPastEpoch".to_string(),
945 PvValue::Scalar(ScalarValue::I64(ts.seconds_past_epoch)),
946 ),
947 (
948 "nanoseconds".to_string(),
949 PvValue::Scalar(ScalarValue::I32(ts.nanoseconds)),
950 ),
951 (
952 "userTag".to_string(),
953 PvValue::Scalar(ScalarValue::I32(ts.user_tag)),
954 ),
955 ],
956 }
957}
958
959fn timestamp_to_pv_value_default() -> PvValue {
960 timestamp_pv(&spvirit_types::NtTimeStamp::default())
961}
962
963#[cfg(test)]
968mod tests {
969 use super::*;
970
971 #[test]
972 fn parse_basic_group() {
973 let json = r#"{
974 "GRP:test": {
975 "+id": "epics:nt/NTTable:1.0",
976 "+atomic": true,
977 "fieldA": {
978 "+channel": "REC:A",
979 "+type": "scalar",
980 "+trigger": "*"
981 },
982 "fieldB": {
983 "+channel": "REC:B",
984 "+type": "plain"
985 }
986 }
987 }"#;
988
989 let groups = parse_group_config(json).unwrap();
990 assert_eq!(groups.len(), 1);
991 let g = &groups[0];
992 assert_eq!(g.name, "GRP:test");
993 assert_eq!(g.struct_id.as_deref(), Some("epics:nt/NTTable:1.0"));
994 assert!(g.atomic);
995 assert_eq!(g.members.len(), 2);
996
997 let a = g.members.iter().find(|m| m.field_name == "fieldA").unwrap();
998 assert_eq!(a.channel, "REC:A");
999 assert_eq!(a.mapping, FieldMapping::Scalar);
1000 assert_eq!(a.triggers, TriggerDef::All);
1001
1002 let b = g.members.iter().find(|m| m.field_name == "fieldB").unwrap();
1003 assert_eq!(b.channel, "REC:B");
1004 assert_eq!(b.mapping, FieldMapping::Plain);
1005 }
1006
1007 #[test]
1008 fn parse_minimal_member() {
1009 let json = r#"{
1010 "GRP:min": {
1011 "x": { "+channel": "R:x" }
1012 }
1013 }"#;
1014
1015 let groups = parse_group_config(json).unwrap();
1016 let m = &groups[0].members[0];
1017 assert_eq!(m.mapping, FieldMapping::Plain); assert_eq!(m.triggers, TriggerDef::None); assert_eq!(m.put_order, 0);
1020 }
1021
1022 #[test]
1023 fn parse_proc_mapping() {
1024 let json = r#"{
1025 "GRP:proc": {
1026 "go": {
1027 "+channel": "REC:PROC",
1028 "+type": "proc",
1029 "+trigger": "go",
1030 "+putorder": 99
1031 }
1032 }
1033 }"#;
1034
1035 let groups = parse_group_config(json).unwrap();
1036 let m = &groups[0].members[0];
1037 assert_eq!(m.mapping, FieldMapping::Proc);
1038 assert_eq!(m.put_order, 99);
1039 assert_eq!(m.triggers, TriggerDef::Fields(vec!["go".into()]));
1040 }
1041
1042 #[test]
1043 fn parse_error_missing_channel() {
1044 let json = r#"{
1045 "GRP:bad": {
1046 "x": { "+type": "scalar" }
1047 }
1048 }"#;
1049
1050 assert!(parse_group_config(json).is_err());
1051 }
1052
1053 #[test]
1054 fn parse_multiple_groups() {
1055 let json = r#"{
1056 "G:a": { "x": { "+channel": "R:x" } },
1057 "G:b": { "y": { "+channel": "R:y" } }
1058 }"#;
1059
1060 let groups = parse_group_config(json).unwrap();
1061 assert_eq!(groups.len(), 2);
1062 }
1063
1064 #[test]
1065 fn parse_member_id() {
1066 let json = r#"{
1067 "GRP:id": {
1068 "val": {
1069 "+channel": "R:val",
1070 "+id": "custom_t"
1071 }
1072 }
1073 }"#;
1074
1075 let groups = parse_group_config(json).unwrap();
1076 assert_eq!(groups[0].members[0].struct_id.as_deref(), Some("custom_t"));
1077 }
1078
1079 #[test]
1080 fn parse_member_no_id() {
1081 let json = r#"{
1082 "GRP:noid": {
1083 "v": { "+channel": "R:v" }
1084 }
1085 }"#;
1086
1087 let groups = parse_group_config(json).unwrap();
1088 assert!(groups[0].members[0].struct_id.is_none());
1089 }
1090
1091 #[test]
1092 fn parse_info_group_prefix() {
1093 let json = r#"{
1094 "TEMP:group": {
1095 "VAL": {
1096 "+channel": "VAL",
1097 "+type": "plain",
1098 "+trigger": "*"
1099 }
1100 }
1101 }"#;
1102
1103 let groups = parse_info_group("TEMP:sensor", json).unwrap();
1104 assert_eq!(groups[0].members[0].channel, "TEMP:sensor.VAL");
1106 }
1107
1108 #[test]
1109 fn parse_info_group_absolute_channel() {
1110 let json = r#"{
1111 "TEMP:group": {
1112 "pressure": {
1113 "+channel": "PRESS:ai",
1114 "+type": "scalar"
1115 }
1116 }
1117 }"#;
1118
1119 let groups = parse_info_group("TEMP:sensor", json).unwrap();
1120 assert_eq!(groups[0].members[0].channel, "PRESS:ai");
1122 }
1123
1124 #[test]
1125 fn merge_groups() {
1126 let mut existing = HashMap::new();
1127 let defs1 = parse_group_config(r#"{ "GRP:a": { "x": { "+channel": "R1:x" } } }"#).unwrap();
1128 merge_group_defs(&mut existing, defs1);
1129
1130 let defs2 = parse_group_config(r#"{ "GRP:a": { "y": { "+channel": "R2:y" } } }"#).unwrap();
1131 merge_group_defs(&mut existing, defs2);
1132
1133 let grp = existing.get("GRP:a").unwrap();
1134 assert_eq!(grp.members.len(), 2);
1135 }
1136
1137 #[test]
1138 fn trigger_validation_unknown_field() {
1139 let json = r#"{
1140 "GRP:bad": {
1141 "x": {
1142 "+channel": "R:x",
1143 "+trigger": "y,z"
1144 },
1145 "y": { "+channel": "R:y" }
1146 }
1147 }"#;
1148
1149 let result = parse_group_config(json);
1151 assert!(result.is_err());
1152 let e = format!("{}", result.unwrap_err());
1153 assert!(e.contains("'z'"), "expected error about 'z': {e}");
1154 }
1155
1156 #[test]
1157 fn trigger_validation_self_reference() {
1158 let json = r#"{
1159 "GRP:ok": {
1160 "a": { "+channel": "R:a", "+trigger": "a,b" },
1161 "b": { "+channel": "R:b", "+trigger": "a" }
1162 }
1163 }"#;
1164
1165 assert!(parse_group_config(json).is_ok());
1167 }
1168
1169 #[test]
1170 fn trigger_validation_star_passes() {
1171 let json = r#"{
1172 "GRP:ok": {
1173 "a": { "+channel": "R:a", "+trigger": "*" }
1174 }
1175 }"#;
1176
1177 assert!(parse_group_config(json).is_ok());
1179 }
1180}