Skip to main content

spvirit_server/
group.rs

1//! Group PV configuration: parse JSON definitions that compose multiple
2//! individual PVs into a single structured PVA channel.
3//!
4//! Corresponds to the C++ QSRV `group` info-tag / JSON config format.
5//! A group PV is a composite PVA channel whose top-level fields each map
6//! to a different backing PV ("member").
7//!
8//! # JSON format
9//!
10//! ```json
11//! {
12//!     "GRP:name": {
13//!         "+id": "epics:nt/NTTable:1.0",
14//!         "+atomic": true,
15//!         "fieldA": {
16//!             "+channel": "RECORD:A",
17//!             "+type": "scalar",
18//!             "+trigger": "*",
19//!             "+putorder": 0
20//!         },
21//!         "fieldB": { "+channel": "RECORD:B" }
22//!     }
23//! }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28
29use serde::Deserialize;
30
31// ---------------------------------------------------------------------------
32// Public types
33// ---------------------------------------------------------------------------
34
35/// How a member PV's value is mapped into the group structure.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum FieldMapping {
38    /// NTScalar/NTScalarArray with full metadata (alarm, timestamp, display, control).
39    Scalar,
40    /// Value only, no metadata.
41    Plain,
42    /// Alarm + timestamp only, no value transfer.
43    Meta,
44    /// Variant-union wrapping (pass-through).
45    Any,
46    /// Process-only: put triggers record processing, no value transfer.
47    Proc,
48}
49
50/// When a member's value changes, which group fields should be re-published.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum TriggerDef {
53    /// Trigger all fields (`"*"`).
54    All,
55    /// Trigger only the named fields.
56    Fields(Vec<String>),
57    /// Never trigger (dead member).
58    None,
59}
60
61/// Definition of one field inside a group PV.
62#[derive(Debug, Clone)]
63pub struct GroupMember {
64    /// Field name in the group structure.
65    pub field_name: String,
66    /// Backing channel / PV name.
67    pub channel: String,
68    /// How the PV value is mapped.
69    pub mapping: FieldMapping,
70    /// Which fields a change to this member triggers.
71    pub triggers: TriggerDef,
72    /// Ordering for put operations (lower = earlier).
73    pub put_order: i32,
74    /// Optional struct-id override for this field.
75    pub struct_id: Option<String>,
76}
77
78/// A group PV definition: a structured channel composed of several member PVs.
79#[derive(Debug, Clone)]
80pub struct GroupPvDef {
81    /// Channel name for the group.
82    pub name: String,
83    /// Optional struct-id (e.g. `"epics:nt/NTTable:1.0"`).
84    pub struct_id: Option<String>,
85    /// Whether GET/PUT/MONITOR operate atomically across all members.
86    pub atomic: bool,
87    /// Ordered list of member field definitions.
88    pub members: Vec<GroupMember>,
89}
90
91// ---------------------------------------------------------------------------
92// Error
93// ---------------------------------------------------------------------------
94
95#[derive(Debug)]
96pub struct GroupConfigError(String);
97
98impl fmt::Display for GroupConfigError {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        write!(f, "group config: {}", self.0)
101    }
102}
103
104impl std::error::Error for GroupConfigError {}
105
106type Result<T> = std::result::Result<T, GroupConfigError>;
107
108fn err(msg: impl Into<String>) -> GroupConfigError {
109    GroupConfigError(msg.into())
110}
111
112// ---------------------------------------------------------------------------
113// Serde helpers (intermediate JSON representation)
114// ---------------------------------------------------------------------------
115
116/// Top-level JSON: `{ "GROUP:name": { ...members... } }`
117type RawConfig = HashMap<String, RawGroupDef>;
118
119/// One group definition: meta keys (`+id`, `+atomic`) mixed with member defs.
120#[derive(Deserialize)]
121struct RawGroupDef {
122    #[serde(rename = "+id", default)]
123    id: Option<String>,
124    #[serde(rename = "+atomic", default)]
125    atomic: Option<bool>,
126    #[serde(flatten)]
127    members: HashMap<String, RawMember>,
128}
129
130#[derive(Deserialize)]
131struct RawMember {
132    #[serde(rename = "+channel", default)]
133    channel: Option<String>,
134    #[serde(rename = "+type", default)]
135    mapping: Option<String>,
136    #[serde(rename = "+trigger", default)]
137    trigger: Option<String>,
138    #[serde(rename = "+putorder", default)]
139    putorder: Option<i32>,
140    #[serde(rename = "+id", default)]
141    id: Option<String>,
142}
143
144// ---------------------------------------------------------------------------
145// Public API
146// ---------------------------------------------------------------------------
147
148/// Parse a JSON string describing one or more group PVs.
149///
150/// Returns a `Vec<GroupPvDef>` — one entry per top-level key.
151pub fn parse_group_config(json: &str) -> Result<Vec<GroupPvDef>> {
152    let raw: RawConfig =
153        serde_json::from_str(json).map_err(|e| err(format!("invalid JSON: {e}")))?;
154
155    let mut groups: Vec<GroupPvDef> = Vec::with_capacity(raw.len());
156    for (name, raw_group) in raw {
157        groups.push(raw_to_group_def(name, raw_group)?);
158    }
159    // Sort for deterministic order.
160    groups.sort_by(|a, b| a.name.cmp(&b.name));
161    Ok(groups)
162}
163
164/// Parse a record's `info(Q:group, ...)` JSON tag.
165///
166/// Bare channel names (no `:`) are prefixed with `"{record_name}."`.
167pub fn parse_info_group(record_name: &str, json: &str) -> Result<Vec<GroupPvDef>> {
168    let raw: RawConfig =
169        serde_json::from_str(json).map_err(|e| err(format!("invalid JSON: {e}")))?;
170
171    let mut groups: Vec<GroupPvDef> = Vec::with_capacity(raw.len());
172    for (name, mut raw_group) in raw {
173        // Prefix bare channel names.
174        for member in raw_group.members.values_mut() {
175            if let Some(ref mut ch) = member.channel {
176                if !ch.contains(':') {
177                    *ch = format!("{record_name}.{ch}");
178                }
179            }
180        }
181        groups.push(raw_to_group_def(name, raw_group)?);
182    }
183    groups.sort_by(|a, b| a.name.cmp(&b.name));
184    Ok(groups)
185}
186
187/// Merge newly-parsed group defs into an existing map (accumulates members).
188///
189/// This supports the C++ pattern where multiple records each contribute
190/// members to the same group name.
191pub fn merge_group_defs(existing: &mut HashMap<String, GroupPvDef>, new_defs: Vec<GroupPvDef>) {
192    for def in new_defs {
193        existing
194            .entry(def.name.clone())
195            .and_modify(|e| {
196                // Merge struct_id — last one wins (like C++ QSRV).
197                if def.struct_id.is_some() {
198                    e.struct_id.clone_from(&def.struct_id);
199                }
200                e.atomic |= def.atomic;
201                e.members.extend(def.members.iter().cloned());
202            })
203            .or_insert(def);
204    }
205}
206
207// ---------------------------------------------------------------------------
208// Internal helpers
209// ---------------------------------------------------------------------------
210
211fn raw_to_group_def(name: String, raw: RawGroupDef) -> Result<GroupPvDef> {
212    let mut members = Vec::with_capacity(raw.members.len());
213
214    // Collect all field names first (for trigger validation).
215    let field_names: Vec<&str> = raw.members.keys().map(|s| s.as_str()).collect();
216
217    for (field_name, raw_member) in &raw.members {
218        members.push(parse_member(field_name, raw_member, &field_names)?);
219    }
220
221    // Sort members by field name for deterministic ordering.
222    members.sort_by(|a, b| a.field_name.cmp(&b.field_name));
223
224    Ok(GroupPvDef {
225        name,
226        struct_id: raw.id,
227        atomic: raw.atomic.unwrap_or(false),
228        members,
229    })
230}
231
232fn parse_member(field_name: &str, raw: &RawMember, all_fields: &[&str]) -> Result<GroupMember> {
233    let channel = raw
234        .channel
235        .clone()
236        .ok_or_else(|| err(format!("member '{field_name}' missing +channel")))?;
237
238    let mapping = match raw.mapping.as_deref() {
239        None | Some("plain") => FieldMapping::Plain,
240        Some("scalar") => FieldMapping::Scalar,
241        Some("meta") => FieldMapping::Meta,
242        Some("any") => FieldMapping::Any,
243        Some("proc") => FieldMapping::Proc,
244        Some(other) => {
245            return Err(err(format!(
246                "member '{field_name}': unknown +type '{other}'"
247            )));
248        }
249    };
250
251    let triggers = match raw.trigger.as_deref() {
252        None => TriggerDef::None,
253        Some("*") => TriggerDef::All,
254        Some("") => TriggerDef::None,
255        Some(spec) => {
256            let names: Vec<String> = spec.split(',').map(|s| s.trim().to_owned()).collect();
257            // Validate that every named trigger refers to a known field.
258            for n in &names {
259                if !all_fields.contains(&n.as_str()) {
260                    return Err(err(format!(
261                        "member '{field_name}': trigger references unknown field '{n}'"
262                    )));
263                }
264            }
265            TriggerDef::Fields(names)
266        }
267    };
268
269    Ok(GroupMember {
270        field_name: field_name.to_owned(),
271        channel,
272        mapping,
273        triggers,
274        put_order: raw.putorder.unwrap_or(0),
275        struct_id: raw.id.clone(),
276    })
277}
278
279// ---------------------------------------------------------------------------
280// GroupPvStore — PvStore wrapper that serves group PVs
281// ---------------------------------------------------------------------------
282
283use std::sync::Arc;
284
285use tokio::sync::mpsc;
286
287use spvirit_codec::spvd_decode::{DecodedValue, FieldDesc, FieldType, StructureDesc, TypeCode};
288use spvirit_types::{NtPayload, PvValue, ScalarArrayValue, ScalarValue};
289
290use crate::pvstore::PvStore;
291use crate::simple_store::descriptor_for_payload;
292
293/// A [`PvStore`] wrapper that adds group-PV support on top of an inner store.
294///
295/// Group PV names are resolved by fetching each member PV from the inner store
296/// and composing them into an [`NtPayload::Generic`]. Non-group PV names are
297/// forwarded to the inner store unchanged.
298pub struct GroupPvStore<S: PvStore> {
299    inner: Arc<S>,
300    groups: HashMap<String, GroupPvDef>,
301}
302
303impl<S: PvStore> GroupPvStore<S> {
304    /// Create a new wrapper around `inner` with the given group definitions.
305    pub fn new(inner: Arc<S>, groups: HashMap<String, GroupPvDef>) -> Self {
306        Self { inner, groups }
307    }
308
309    /// Build a composite snapshot for a group PV.
310    async fn group_snapshot(&self, def: &GroupPvDef) -> NtPayload {
311        let mut fields = Vec::with_capacity(def.members.len());
312        for member in &def.members {
313            if member.mapping == FieldMapping::Proc {
314                continue; // Proc members don't contribute a value field.
315            }
316            let pv_val = match self.inner.get_snapshot(&member.channel).await {
317                Some(snap) => payload_to_pv_value(&snap, member.mapping),
318                None => PvValue::Scalar(ScalarValue::I32(0)), // disconnected fallback
319            };
320            fields.push((member.field_name.clone(), pv_val));
321        }
322        NtPayload::Generic {
323            struct_id: def
324                .struct_id
325                .clone()
326                .unwrap_or_else(|| "structure".to_string()),
327            fields,
328        }
329    }
330
331    /// Build a structure descriptor for a group PV.
332    async fn group_descriptor(&self, def: &GroupPvDef) -> StructureDesc {
333        let mut field_descs = Vec::with_capacity(def.members.len());
334        for member in &def.members {
335            if member.mapping == FieldMapping::Proc {
336                continue;
337            }
338            let field_type = match self.inner.get_snapshot(&member.channel).await {
339                Some(snap) => payload_field_type(&snap, member.mapping),
340                None => FieldType::Scalar(TypeCode::Int32), // disconnected fallback
341            };
342            field_descs.push(FieldDesc {
343                name: member.field_name.clone(),
344                field_type,
345            });
346        }
347        StructureDesc {
348            struct_id: def.struct_id.clone(),
349            fields: field_descs,
350        }
351    }
352}
353
354impl<S: PvStore> PvStore for GroupPvStore<S> {
355    fn has_pv(&self, name: &str) -> impl Future<Output = bool> + Send {
356        async move {
357            if self.groups.contains_key(name) {
358                return true;
359            }
360            self.inner.has_pv(name).await
361        }
362    }
363
364    fn get_snapshot(&self, name: &str) -> impl Future<Output = Option<NtPayload>> + Send {
365        async move {
366            if let Some(def) = self.groups.get(name) {
367                return Some(self.group_snapshot(def).await);
368            }
369            self.inner.get_snapshot(name).await
370        }
371    }
372
373    fn get_descriptor(&self, name: &str) -> impl Future<Output = Option<StructureDesc>> + Send {
374        async move {
375            if let Some(def) = self.groups.get(name) {
376                return Some(self.group_descriptor(def).await);
377            }
378            self.inner.get_descriptor(name).await
379        }
380    }
381
382    fn put_value(
383        &self,
384        name: &str,
385        value: &DecodedValue,
386    ) -> impl Future<Output = std::result::Result<Vec<(String, NtPayload)>, String>> + Send {
387        let name = name.to_string();
388        let value = value.clone();
389        async move {
390            if let Some(def) = self.groups.get(&name) {
391                // Dispatch sub-field puts to member channels.
392                let fields = match &value {
393                    DecodedValue::Structure(f) => f,
394                    _ => return Err("group PUT requires a structure".to_string()),
395                };
396
397                // Sort members by put_order.
398                let mut ordered: Vec<&GroupMember> = def.members.iter().collect();
399                ordered.sort_by_key(|m| m.put_order);
400
401                let mut results = Vec::new();
402                for member in ordered {
403                    if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Meta
404                    {
405                        continue;
406                    }
407                    // Find the sub-field matching this member.
408                    if let Some((_, sub_val)) = fields.iter().find(|(n, _)| n == &member.field_name)
409                    {
410                        match self.inner.put_value(&member.channel, sub_val).await {
411                            Ok(mut r) => results.append(&mut r),
412                            Err(e) => {
413                                tracing::warn!(
414                                    "group PUT {}: member {} failed: {e}",
415                                    name,
416                                    member.field_name
417                                );
418                            }
419                        }
420                    }
421                }
422                Ok(results)
423            } else {
424                self.inner.put_value(&name, &value).await
425            }
426        }
427    }
428
429    fn is_writable(&self, name: &str) -> impl Future<Output = bool> + Send {
430        async move {
431            if let Some(def) = self.groups.get(name) {
432                // A group PV is writable if any non-proc non-meta member is writable.
433                for member in &def.members {
434                    if member.mapping == FieldMapping::Proc || member.mapping == FieldMapping::Meta
435                    {
436                        continue;
437                    }
438                    if self.inner.is_writable(&member.channel).await {
439                        return true;
440                    }
441                }
442                return false;
443            }
444            self.inner.is_writable(name).await
445        }
446    }
447
448    fn list_pvs(&self) -> impl Future<Output = Vec<String>> + Send {
449        async move {
450            let mut pvs = self.inner.list_pvs().await;
451            pvs.extend(self.groups.keys().cloned());
452            pvs.sort();
453            pvs.dedup();
454            pvs
455        }
456    }
457
458    fn subscribe(
459        &self,
460        name: &str,
461    ) -> impl Future<Output = Option<mpsc::Receiver<NtPayload>>> + Send {
462        let name = name.to_string();
463        async move {
464            if let Some(def) = self.groups.get(&name) {
465                return self.subscribe_group(def).await;
466            }
467            self.inner.subscribe(&name).await
468        }
469    }
470}
471
472impl<S: PvStore> GroupPvStore<S> {
473    /// Subscribe to a group PV by fanning-in member subscriptions.
474    ///
475    /// When any member PV updates, the trigger rules are evaluated and if
476    /// triggered, a full group snapshot is composed and sent to the subscriber.
477    async fn subscribe_group(&self, def: &GroupPvDef) -> Option<mpsc::Receiver<NtPayload>> {
478        let (tx, rx) = mpsc::channel(64);
479        let inner = self.inner.clone();
480        let def = def.clone();
481
482        // Collect member subscriptions.
483        let mut member_rxs: Vec<(String, mpsc::Receiver<NtPayload>)> = Vec::new();
484        for member in &def.members {
485            if let Some(member_rx) = inner.subscribe(&member.channel).await {
486                member_rxs.push((member.field_name.clone(), member_rx));
487            }
488        }
489        if member_rxs.is_empty() {
490            return None;
491        }
492
493        // Build trigger map: field_name → set of field names that get triggered.
494        let trigger_map = build_trigger_map(&def);
495
496        tokio::spawn(async move {
497            // Use a select!-like loop: poll all member receivers.
498            // We use tokio::select! with a macro-generated match is complex for
499            // dynamic member sets, so use a simpler polling approach.
500            loop {
501                // Wait for any member to produce a value.
502                let src_field = match poll_any_member(&mut member_rxs).await {
503                    Some(field_name) => field_name,
504                    None => break, // All members closed.
505                };
506
507                {
508                    // Check trigger rules.
509                    let should_send = match trigger_map.get(&src_field) {
510                        Some(targets) => !targets.is_empty(),
511                        None => false,
512                    };
513
514                    if should_send {
515                        // Compose a full group snapshot.
516                        let mut fields = Vec::with_capacity(def.members.len());
517                        for member in &def.members {
518                            if member.mapping == FieldMapping::Proc {
519                                continue;
520                            }
521                            let pv_val = match inner.get_snapshot(&member.channel).await {
522                                Some(snap) => payload_to_pv_value(&snap, member.mapping),
523                                None => PvValue::Scalar(ScalarValue::I32(0)),
524                            };
525                            fields.push((member.field_name.clone(), pv_val));
526                        }
527                        let payload = NtPayload::Generic {
528                            struct_id: def
529                                .struct_id
530                                .clone()
531                                .unwrap_or_else(|| "structure".to_string()),
532                            fields,
533                        };
534                        if tx.send(payload).await.is_err() {
535                            break; // Subscriber dropped.
536                        }
537                    }
538                }
539            }
540        });
541
542        Some(rx)
543    }
544}
545
546/// Build a map from source field_name → set of triggered field names.
547fn build_trigger_map(def: &GroupPvDef) -> HashMap<String, Vec<String>> {
548    let all_fields: Vec<String> = def.members.iter().map(|m| m.field_name.clone()).collect();
549    let mut map: HashMap<String, Vec<String>> = HashMap::new();
550
551    for member in &def.members {
552        let targets = match &member.triggers {
553            TriggerDef::All => all_fields.clone(),
554            TriggerDef::Fields(names) => names.clone(),
555            TriggerDef::None => Vec::new(),
556        };
557        map.insert(member.field_name.clone(), targets);
558    }
559    map
560}
561
562/// Wait for any member receiver to produce a value, returning the field name
563/// of the member that updated. Returns `None` when all channels are closed.
564async fn poll_any_member(members: &mut Vec<(String, mpsc::Receiver<NtPayload>)>) -> Option<String> {
565    if members.is_empty() {
566        return None;
567    }
568
569    // Create pinned futures for each member.
570    let futs: Vec<_> = members
571        .iter_mut()
572        .map(|(name, rx)| {
573            let name = name.clone();
574            Box::pin(async move { (name, rx.recv().await) })
575                as std::pin::Pin<Box<dyn Future<Output = (String, Option<NtPayload>)> + Send + '_>>
576        })
577        .collect();
578
579    let (field_name, payload) = race_all(futs).await;
580
581    if payload.is_none() {
582        // This member channel is closed; remove it.
583        members.retain(|(n, _)| n != &field_name);
584        if members.is_empty() {
585            return None;
586        }
587        return Box::pin(poll_any_member(members)).await;
588    }
589
590    Some(field_name)
591}
592
593/// Race a vec of pinned futures, returning the result of the first to complete.
594async fn race_all<T>(futs: Vec<std::pin::Pin<Box<dyn Future<Output = T> + Send + '_>>>) -> T {
595    use std::pin::Pin;
596    use std::task::{Context, Poll};
597
598    assert!(!futs.is_empty());
599
600    struct RaceAll<'a, T> {
601        futs: Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>,
602    }
603
604    impl<T> Future for RaceAll<'_, T> {
605        type Output = T;
606        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
607            for fut in &mut self.futs {
608                if let Poll::Ready(val) = fut.as_mut().poll(cx) {
609                    return Poll::Ready(val);
610                }
611            }
612            Poll::Pending
613        }
614    }
615
616    RaceAll { futs }.await
617}
618
619// ---------------------------------------------------------------------------
620// NtPayload → PvValue conversion helpers
621// ---------------------------------------------------------------------------
622
623/// Convert an NtPayload (member snapshot) to a PvValue for embedding in a
624/// group structure, respecting the FieldMapping.
625fn payload_to_pv_value(payload: &NtPayload, mapping: FieldMapping) -> PvValue {
626    match mapping {
627        FieldMapping::Scalar => payload_to_full_structure(payload),
628        FieldMapping::Plain => payload_to_value_only(payload),
629        FieldMapping::Meta => payload_to_meta_only(payload),
630        FieldMapping::Any => payload_to_full_structure(payload),
631        FieldMapping::Proc => PvValue::Scalar(ScalarValue::I32(0)), // shouldn't be called
632    }
633}
634
635/// Full NT structure as PvValue (Scalar mapping).
636fn payload_to_full_structure(payload: &NtPayload) -> PvValue {
637    match payload {
638        NtPayload::Scalar(nt) => {
639            let mut fields = vec![
640                ("value".to_string(), PvValue::Scalar(nt.value.clone())),
641                (
642                    "alarm".to_string(),
643                    alarm_to_pv_value(nt.alarm_severity, nt.alarm_status, &nt.alarm_message),
644                ),
645                ("timeStamp".to_string(), timestamp_to_pv_value_default()),
646            ];
647            fields.push((
648                "display".to_string(),
649                PvValue::Structure {
650                    struct_id: "display_t".to_string(),
651                    fields: vec![
652                        (
653                            "limitLow".to_string(),
654                            PvValue::Scalar(ScalarValue::F64(nt.display_low)),
655                        ),
656                        (
657                            "limitHigh".to_string(),
658                            PvValue::Scalar(ScalarValue::F64(nt.display_high)),
659                        ),
660                        (
661                            "description".to_string(),
662                            PvValue::Scalar(ScalarValue::Str(nt.display_description.clone())),
663                        ),
664                        (
665                            "units".to_string(),
666                            PvValue::Scalar(ScalarValue::Str(nt.units.clone())),
667                        ),
668                        (
669                            "precision".to_string(),
670                            PvValue::Scalar(ScalarValue::I32(nt.display_precision)),
671                        ),
672                    ],
673                },
674            ));
675            fields.push((
676                "control".to_string(),
677                PvValue::Structure {
678                    struct_id: "control_t".to_string(),
679                    fields: vec![
680                        (
681                            "limitLow".to_string(),
682                            PvValue::Scalar(ScalarValue::F64(nt.control_low)),
683                        ),
684                        (
685                            "limitHigh".to_string(),
686                            PvValue::Scalar(ScalarValue::F64(nt.control_high)),
687                        ),
688                        (
689                            "minStep".to_string(),
690                            PvValue::Scalar(ScalarValue::F64(nt.control_min_step)),
691                        ),
692                    ],
693                },
694            ));
695            PvValue::Structure {
696                struct_id: "epics:nt/NTScalar:1.0".to_string(),
697                fields,
698            }
699        }
700        NtPayload::ScalarArray(nt) => {
701            let fields = vec![
702                ("value".to_string(), PvValue::ScalarArray(nt.value.clone())),
703                (
704                    "alarm".to_string(),
705                    alarm_to_pv_value(nt.alarm.severity, nt.alarm.status, &nt.alarm.message),
706                ),
707                ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
708            ];
709            PvValue::Structure {
710                struct_id: "epics:nt/NTScalarArray:1.0".to_string(),
711                fields,
712            }
713        }
714        NtPayload::Enum(nt) => {
715            let fields = vec![
716                (
717                    "value".to_string(),
718                    PvValue::Structure {
719                        struct_id: "enum_t".to_string(),
720                        fields: vec![
721                            (
722                                "index".to_string(),
723                                PvValue::Scalar(ScalarValue::I32(nt.index)),
724                            ),
725                            (
726                                "choices".to_string(),
727                                PvValue::ScalarArray(ScalarArrayValue::Str(nt.choices.clone())),
728                            ),
729                        ],
730                    },
731                ),
732                ("alarm".to_string(), alarm_pv(&nt.alarm)),
733                ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
734            ];
735            PvValue::Structure {
736                struct_id: "epics:nt/NTEnum:1.0".to_string(),
737                fields,
738            }
739        }
740        NtPayload::Generic { struct_id, fields } => PvValue::Structure {
741            struct_id: struct_id.clone(),
742            fields: fields.clone(),
743        },
744        _ => PvValue::Scalar(ScalarValue::I32(0)),
745    }
746}
747
748/// Value-only (Plain mapping) — extract just the primary value.
749fn payload_to_value_only(payload: &NtPayload) -> PvValue {
750    match payload {
751        NtPayload::Scalar(nt) => PvValue::Scalar(nt.value.clone()),
752        NtPayload::ScalarArray(nt) => PvValue::ScalarArray(nt.value.clone()),
753        NtPayload::Enum(nt) => PvValue::Scalar(ScalarValue::I32(nt.index)),
754        NtPayload::Generic { fields, .. } => {
755            // Try to find a "value" field.
756            fields
757                .iter()
758                .find(|(n, _)| n == "value")
759                .map(|(_, v)| v.clone())
760                .unwrap_or(PvValue::Scalar(ScalarValue::I32(0)))
761        }
762        _ => PvValue::Scalar(ScalarValue::I32(0)),
763    }
764}
765
766/// Meta-only (Meta mapping) — alarm + timestamp only.
767fn payload_to_meta_only(payload: &NtPayload) -> PvValue {
768    match payload {
769        NtPayload::Scalar(nt) => PvValue::Structure {
770            struct_id: String::new(),
771            fields: vec![
772                (
773                    "alarm".to_string(),
774                    alarm_to_pv_value(nt.alarm_severity, nt.alarm_status, &nt.alarm_message),
775                ),
776                ("timeStamp".to_string(), timestamp_to_pv_value_default()),
777            ],
778        },
779        NtPayload::ScalarArray(nt) => PvValue::Structure {
780            struct_id: String::new(),
781            fields: vec![
782                ("alarm".to_string(), alarm_pv(&nt.alarm)),
783                ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
784            ],
785        },
786        NtPayload::Enum(nt) => PvValue::Structure {
787            struct_id: String::new(),
788            fields: vec![
789                ("alarm".to_string(), alarm_pv(&nt.alarm)),
790                ("timeStamp".to_string(), timestamp_pv(&nt.time_stamp)),
791            ],
792        },
793        _ => PvValue::Structure {
794            struct_id: String::new(),
795            fields: vec![],
796        },
797    }
798}
799
800/// Build a FieldType for a member's contribution to the group descriptor.
801fn payload_field_type(payload: &NtPayload, mapping: FieldMapping) -> FieldType {
802    match mapping {
803        FieldMapping::Scalar | FieldMapping::Any => {
804            FieldType::Structure(descriptor_for_payload(payload))
805        }
806        FieldMapping::Plain => match payload {
807            NtPayload::Scalar(nt) => value_field_type(&nt.value),
808            NtPayload::ScalarArray(nt) => array_field_type(&nt.value),
809            NtPayload::Enum(_) => FieldType::Scalar(TypeCode::Int32),
810            _ => FieldType::Scalar(TypeCode::Int32),
811        },
812        FieldMapping::Meta => FieldType::Structure(StructureDesc {
813            struct_id: None,
814            fields: vec![
815                FieldDesc {
816                    name: "alarm".to_string(),
817                    field_type: FieldType::Structure(alarm_struct_desc()),
818                },
819                FieldDesc {
820                    name: "timeStamp".to_string(),
821                    field_type: FieldType::Structure(timestamp_struct_desc()),
822                },
823            ],
824        }),
825        FieldMapping::Proc => FieldType::Scalar(TypeCode::Int32),
826    }
827}
828
829fn value_field_type(sv: &ScalarValue) -> FieldType {
830    match sv {
831        ScalarValue::Str(_) => FieldType::String,
832        sv => {
833            let tc = match sv {
834                ScalarValue::Bool(_) => TypeCode::Boolean,
835                ScalarValue::I8(_) => TypeCode::Int8,
836                ScalarValue::I16(_) => TypeCode::Int16,
837                ScalarValue::I32(_) => TypeCode::Int32,
838                ScalarValue::I64(_) => TypeCode::Int64,
839                ScalarValue::U8(_) => TypeCode::UInt8,
840                ScalarValue::U16(_) => TypeCode::UInt16,
841                ScalarValue::U32(_) => TypeCode::UInt32,
842                ScalarValue::U64(_) => TypeCode::UInt64,
843                ScalarValue::F32(_) => TypeCode::Float32,
844                ScalarValue::F64(_) => TypeCode::Float64,
845                ScalarValue::Str(_) => unreachable!(),
846            };
847            FieldType::Scalar(tc)
848        }
849    }
850}
851
852fn array_field_type(sav: &ScalarArrayValue) -> FieldType {
853    match sav {
854        ScalarArrayValue::Str(_) => FieldType::StringArray,
855        sav => {
856            let tc = match sav {
857                ScalarArrayValue::Bool(_) => TypeCode::Boolean,
858                ScalarArrayValue::I8(_) => TypeCode::Int8,
859                ScalarArrayValue::I16(_) => TypeCode::Int16,
860                ScalarArrayValue::I32(_) => TypeCode::Int32,
861                ScalarArrayValue::I64(_) => TypeCode::Int64,
862                ScalarArrayValue::U8(_) => TypeCode::UInt8,
863                ScalarArrayValue::U16(_) => TypeCode::UInt16,
864                ScalarArrayValue::U32(_) => TypeCode::UInt32,
865                ScalarArrayValue::U64(_) => TypeCode::UInt64,
866                ScalarArrayValue::F32(_) => TypeCode::Float32,
867                ScalarArrayValue::F64(_) => TypeCode::Float64,
868                ScalarArrayValue::Str(_) => unreachable!(),
869            };
870            FieldType::ScalarArray(tc)
871        }
872    }
873}
874
875fn alarm_struct_desc() -> StructureDesc {
876    StructureDesc {
877        struct_id: Some("alarm_t".to_string()),
878        fields: vec![
879            FieldDesc {
880                name: "severity".to_string(),
881                field_type: FieldType::Scalar(TypeCode::Int32),
882            },
883            FieldDesc {
884                name: "status".to_string(),
885                field_type: FieldType::Scalar(TypeCode::Int32),
886            },
887            FieldDesc {
888                name: "message".to_string(),
889                field_type: FieldType::String,
890            },
891        ],
892    }
893}
894
895fn timestamp_struct_desc() -> StructureDesc {
896    StructureDesc {
897        struct_id: Some("time_t".to_string()),
898        fields: vec![
899            FieldDesc {
900                name: "secondsPastEpoch".to_string(),
901                field_type: FieldType::Scalar(TypeCode::Int64),
902            },
903            FieldDesc {
904                name: "nanoseconds".to_string(),
905                field_type: FieldType::Scalar(TypeCode::Int32),
906            },
907            FieldDesc {
908                name: "userTag".to_string(),
909                field_type: FieldType::Scalar(TypeCode::Int32),
910            },
911        ],
912    }
913}
914
915fn alarm_to_pv_value(severity: i32, status: i32, message: &str) -> PvValue {
916    PvValue::Structure {
917        struct_id: "alarm_t".to_string(),
918        fields: vec![
919            (
920                "severity".to_string(),
921                PvValue::Scalar(ScalarValue::I32(severity)),
922            ),
923            (
924                "status".to_string(),
925                PvValue::Scalar(ScalarValue::I32(status)),
926            ),
927            (
928                "message".to_string(),
929                PvValue::Scalar(ScalarValue::Str(message.to_string())),
930            ),
931        ],
932    }
933}
934
935fn alarm_pv(alarm: &spvirit_types::NtAlarm) -> PvValue {
936    alarm_to_pv_value(alarm.severity, alarm.status, &alarm.message)
937}
938
939fn timestamp_pv(ts: &spvirit_types::NtTimeStamp) -> PvValue {
940    PvValue::Structure {
941        struct_id: "time_t".to_string(),
942        fields: vec![
943            (
944                "secondsPastEpoch".to_string(),
945                PvValue::Scalar(ScalarValue::I64(ts.seconds_past_epoch)),
946            ),
947            (
948                "nanoseconds".to_string(),
949                PvValue::Scalar(ScalarValue::I32(ts.nanoseconds)),
950            ),
951            (
952                "userTag".to_string(),
953                PvValue::Scalar(ScalarValue::I32(ts.user_tag)),
954            ),
955        ],
956    }
957}
958
959fn timestamp_to_pv_value_default() -> PvValue {
960    timestamp_pv(&spvirit_types::NtTimeStamp::default())
961}
962
963// ---------------------------------------------------------------------------
964// Tests
965// ---------------------------------------------------------------------------
966
967#[cfg(test)]
968mod tests {
969    use super::*;
970
971    #[test]
972    fn parse_basic_group() {
973        let json = r#"{
974            "GRP:test": {
975                "+id": "epics:nt/NTTable:1.0",
976                "+atomic": true,
977                "fieldA": {
978                    "+channel": "REC:A",
979                    "+type": "scalar",
980                    "+trigger": "*"
981                },
982                "fieldB": {
983                    "+channel": "REC:B",
984                    "+type": "plain"
985                }
986            }
987        }"#;
988
989        let groups = parse_group_config(json).unwrap();
990        assert_eq!(groups.len(), 1);
991        let g = &groups[0];
992        assert_eq!(g.name, "GRP:test");
993        assert_eq!(g.struct_id.as_deref(), Some("epics:nt/NTTable:1.0"));
994        assert!(g.atomic);
995        assert_eq!(g.members.len(), 2);
996
997        let a = g.members.iter().find(|m| m.field_name == "fieldA").unwrap();
998        assert_eq!(a.channel, "REC:A");
999        assert_eq!(a.mapping, FieldMapping::Scalar);
1000        assert_eq!(a.triggers, TriggerDef::All);
1001
1002        let b = g.members.iter().find(|m| m.field_name == "fieldB").unwrap();
1003        assert_eq!(b.channel, "REC:B");
1004        assert_eq!(b.mapping, FieldMapping::Plain);
1005    }
1006
1007    #[test]
1008    fn parse_minimal_member() {
1009        let json = r#"{
1010            "GRP:min": {
1011                "x": { "+channel": "R:x" }
1012            }
1013        }"#;
1014
1015        let groups = parse_group_config(json).unwrap();
1016        let m = &groups[0].members[0];
1017        assert_eq!(m.mapping, FieldMapping::Plain); // default
1018        assert_eq!(m.triggers, TriggerDef::None); // default
1019        assert_eq!(m.put_order, 0);
1020    }
1021
1022    #[test]
1023    fn parse_proc_mapping() {
1024        let json = r#"{
1025            "GRP:proc": {
1026                "go": {
1027                    "+channel": "REC:PROC",
1028                    "+type": "proc",
1029                    "+trigger": "go",
1030                    "+putorder": 99
1031                }
1032            }
1033        }"#;
1034
1035        let groups = parse_group_config(json).unwrap();
1036        let m = &groups[0].members[0];
1037        assert_eq!(m.mapping, FieldMapping::Proc);
1038        assert_eq!(m.put_order, 99);
1039        assert_eq!(m.triggers, TriggerDef::Fields(vec!["go".into()]));
1040    }
1041
1042    #[test]
1043    fn parse_error_missing_channel() {
1044        let json = r#"{
1045            "GRP:bad": {
1046                "x": { "+type": "scalar" }
1047            }
1048        }"#;
1049
1050        assert!(parse_group_config(json).is_err());
1051    }
1052
1053    #[test]
1054    fn parse_multiple_groups() {
1055        let json = r#"{
1056            "G:a": { "x": { "+channel": "R:x" } },
1057            "G:b": { "y": { "+channel": "R:y" } }
1058        }"#;
1059
1060        let groups = parse_group_config(json).unwrap();
1061        assert_eq!(groups.len(), 2);
1062    }
1063
1064    #[test]
1065    fn parse_member_id() {
1066        let json = r#"{
1067            "GRP:id": {
1068                "val": {
1069                    "+channel": "R:val",
1070                    "+id": "custom_t"
1071                }
1072            }
1073        }"#;
1074
1075        let groups = parse_group_config(json).unwrap();
1076        assert_eq!(groups[0].members[0].struct_id.as_deref(), Some("custom_t"));
1077    }
1078
1079    #[test]
1080    fn parse_member_no_id() {
1081        let json = r#"{
1082            "GRP:noid": {
1083                "v": { "+channel": "R:v" }
1084            }
1085        }"#;
1086
1087        let groups = parse_group_config(json).unwrap();
1088        assert!(groups[0].members[0].struct_id.is_none());
1089    }
1090
1091    #[test]
1092    fn parse_info_group_prefix() {
1093        let json = r#"{
1094            "TEMP:group": {
1095                "VAL": {
1096                    "+channel": "VAL",
1097                    "+type": "plain",
1098                    "+trigger": "*"
1099                }
1100            }
1101        }"#;
1102
1103        let groups = parse_info_group("TEMP:sensor", json).unwrap();
1104        // Bare field "VAL" should become "TEMP:sensor.VAL"
1105        assert_eq!(groups[0].members[0].channel, "TEMP:sensor.VAL");
1106    }
1107
1108    #[test]
1109    fn parse_info_group_absolute_channel() {
1110        let json = r#"{
1111            "TEMP:group": {
1112                "pressure": {
1113                    "+channel": "PRESS:ai",
1114                    "+type": "scalar"
1115                }
1116            }
1117        }"#;
1118
1119        let groups = parse_info_group("TEMP:sensor", json).unwrap();
1120        // Absolute channel (contains ':') should be kept as-is
1121        assert_eq!(groups[0].members[0].channel, "PRESS:ai");
1122    }
1123
1124    #[test]
1125    fn merge_groups() {
1126        let mut existing = HashMap::new();
1127        let defs1 = parse_group_config(r#"{ "GRP:a": { "x": { "+channel": "R1:x" } } }"#).unwrap();
1128        merge_group_defs(&mut existing, defs1);
1129
1130        let defs2 = parse_group_config(r#"{ "GRP:a": { "y": { "+channel": "R2:y" } } }"#).unwrap();
1131        merge_group_defs(&mut existing, defs2);
1132
1133        let grp = existing.get("GRP:a").unwrap();
1134        assert_eq!(grp.members.len(), 2);
1135    }
1136
1137    #[test]
1138    fn trigger_validation_unknown_field() {
1139        let json = r#"{
1140            "GRP:bad": {
1141                "x": {
1142                    "+channel": "R:x",
1143                    "+trigger": "y,z"
1144                },
1145                "y": { "+channel": "R:y" }
1146            }
1147        }"#;
1148
1149        // y exists but z doesn't — should fail.
1150        let result = parse_group_config(json);
1151        assert!(result.is_err());
1152        let e = format!("{}", result.unwrap_err());
1153        assert!(e.contains("'z'"), "expected error about 'z': {e}");
1154    }
1155
1156    #[test]
1157    fn trigger_validation_self_reference() {
1158        let json = r#"{
1159            "GRP:ok": {
1160                "a": { "+channel": "R:a", "+trigger": "a,b" },
1161                "b": { "+channel": "R:b", "+trigger": "a" }
1162            }
1163        }"#;
1164
1165        // Self-reference and cross-reference are both valid.
1166        assert!(parse_group_config(json).is_ok());
1167    }
1168
1169    #[test]
1170    fn trigger_validation_star_passes() {
1171        let json = r#"{
1172            "GRP:ok": {
1173                "a": { "+channel": "R:a", "+trigger": "*" }
1174            }
1175        }"#;
1176
1177        // "*" doesn't go through field validation.
1178        assert!(parse_group_config(json).is_ok());
1179    }
1180}