Skip to main content

ad_plugins_rs/
attribute.rs

1//! NDPluginAttribute: extracts named attribute values from each array.
2//!
3//! Supports up to 8 attribute channels (addr 0..7), each tracking a different
4//! attribute by name. Special pseudo-attribute names "NDArrayUniqueId" and
5//! "NDArrayTimeStamp" read from the array header.
6
7use ad_core_rs::ndarray::NDArray;
8use ad_core_rs::ndarray_pool::NDArrayPool;
9use ad_core_rs::plugin::runtime::{
10    NDPluginProcess, ParamChangeResult, ParamChangeValue, ParamUpdate, PluginParamSnapshot,
11    ProcessResult,
12};
13use asyn_rs::error::AsynError;
14use asyn_rs::param::ParamType;
15use asyn_rs::port::PortDriverBase;
16
17use crate::time_series::{TimeSeriesData, TimeSeriesSender};
18
19/// Maximum number of attribute channels.
20const MAX_ATTR_CHANNELS: usize = 8;
21
22/// Parameter indices for NDPluginAttribute.
23#[derive(Clone, Copy, Default)]
24pub struct AttributeParams {
25    pub attr_name: usize,
26    pub value: usize,
27    pub value_sum: usize,
28    pub reset: usize,
29}
30
31/// State for a single attribute channel.
32#[derive(Clone)]
33struct AttrChannel {
34    name: String,
35    value: f64,
36    value_sum: f64,
37}
38
39impl Default for AttrChannel {
40    fn default() -> Self {
41        Self {
42            name: String::new(),
43            value: 0.0,
44            value_sum: 0.0,
45        }
46    }
47}
48
49impl AttrChannel {
50    fn extract_value(&self, array: &NDArray) -> Option<f64> {
51        if self.name.is_empty() {
52            return None;
53        }
54        match self.name.as_str() {
55            "NDArrayUniqueId" => Some(array.unique_id as f64),
56            "NDArrayTimeStamp" => Some(array.timestamp.as_f64()),
57            "NDArrayEpicsTSSec" => Some(array.timestamp.sec as f64),
58            "NDArrayEpicsTSnSec" => Some(array.timestamp.nsec as f64),
59            _ => array
60                .attributes
61                .get(&self.name)
62                .and_then(|attr| attr.value.as_f64()),
63        }
64    }
65}
66
67/// Processor that extracts multiple attribute values from each array.
68pub struct AttributeProcessor {
69    channels: [AttrChannel; MAX_ATTR_CHANNELS],
70    params: AttributeParams,
71    ts_sender: Option<TimeSeriesSender>,
72}
73
74impl AttributeProcessor {
75    pub fn new(attr_name: &str) -> Self {
76        let mut channels: [AttrChannel; MAX_ATTR_CHANNELS] = Default::default();
77        channels[0].name = attr_name.to_string();
78        Self {
79            channels,
80            params: AttributeParams::default(),
81            ts_sender: None,
82        }
83    }
84
85    pub fn set_ts_sender(&mut self, sender: TimeSeriesSender) {
86        self.ts_sender = Some(sender);
87    }
88
89    /// Access the registered param indices (populated after register_params).
90    pub fn params(&self) -> &AttributeParams {
91        &self.params
92    }
93
94    /// Reset value and value_sum for all channels (C parity: resets all, not just one).
95    pub fn reset(&mut self) {
96        for ch in self.channels.iter_mut() {
97            ch.value = 0.0;
98            ch.value_sum = 0.0;
99        }
100    }
101
102    /// Current extracted value for channel 0.
103    pub fn value(&self) -> f64 {
104        self.channels[0].value
105    }
106
107    /// Current accumulated sum for channel 0.
108    pub fn value_sum(&self) -> f64 {
109        self.channels[0].value_sum
110    }
111
112    /// The attribute name being tracked by channel 0.
113    pub fn attr_name(&self) -> &str {
114        &self.channels[0].name
115    }
116
117    /// Set the attribute name for channel 0.
118    pub fn set_attr_name(&mut self, name: &str) {
119        self.channels[0].name = name.to_string();
120    }
121}
122
123impl NDPluginProcess for AttributeProcessor {
124    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
125        let mut updates = Vec::new();
126
127        for (i, ch) in self.channels.iter_mut().enumerate() {
128            if ch.name.is_empty() {
129                continue;
130            }
131            if let Some(val) = ch.extract_value(array) {
132                ch.value = val;
133                ch.value_sum += val;
134            }
135            let addr = i as i32;
136            updates.push(ParamUpdate::float64_addr(self.params.value, addr, ch.value));
137            updates.push(ParamUpdate::float64_addr(
138                self.params.value_sum,
139                addr,
140                ch.value_sum,
141            ));
142        }
143
144        // Send to time series
145        if let Some(ref sender) = self.ts_sender {
146            let values: Vec<f64> = self.channels.iter().map(|ch| ch.value).collect();
147            let _ = sender.try_send(TimeSeriesData { values });
148        }
149
150        ProcessResult::sink(updates)
151    }
152
153    fn plugin_type(&self) -> &str {
154        "NDPluginAttribute"
155    }
156
157    fn register_params(&mut self, base: &mut PortDriverBase) -> Result<(), AsynError> {
158        self.params.attr_name = base.create_param("ATTR_ATTRNAME", ParamType::Octet)?;
159        self.params.value = base.create_param("ATTR_VAL", ParamType::Float64)?;
160        self.params.value_sum = base.create_param("ATTR_VAL_SUM", ParamType::Float64)?;
161        self.params.reset = base.create_param("ATTR_RESET", ParamType::Int32)?;
162        Ok(())
163    }
164
165    fn on_param_change(
166        &mut self,
167        reason: usize,
168        params: &PluginParamSnapshot,
169    ) -> ParamChangeResult {
170        let addr = params.addr as usize;
171
172        if reason == self.params.attr_name {
173            if addr < MAX_ATTR_CHANNELS {
174                if let ParamChangeValue::Octet(s) = &params.value {
175                    self.channels[addr].name = s.clone();
176                }
177            }
178        } else if reason == self.params.reset {
179            if params.value.as_i32() != 0 {
180                let mut updates = Vec::new();
181                for (i, ch) in self.channels.iter_mut().enumerate() {
182                    ch.value = 0.0;
183                    ch.value_sum = 0.0;
184                    let a = i as i32;
185                    updates.push(ParamUpdate::float64_addr(self.params.value, a, 0.0));
186                    updates.push(ParamUpdate::float64_addr(self.params.value_sum, a, 0.0));
187                }
188                return ParamChangeResult::updates(updates);
189            }
190        }
191
192        ParamChangeResult::updates(vec![])
193    }
194}
195
196/// Channel names for time series (one per attribute channel).
197pub fn attr_ts_channel_names() -> Vec<&'static str> {
198    vec![
199        "TSArrayValue",
200        "TSArrayValue1",
201        "TSArrayValue2",
202        "TSArrayValue3",
203        "TSArrayValue4",
204        "TSArrayValue5",
205        "TSArrayValue6",
206        "TSArrayValue7",
207    ]
208}
209
210/// Create an Attribute plugin runtime. The TS receiver is stored in the registry
211/// for later pickup by `NDTimeSeriesConfigure`.
212pub fn create_attribute_runtime(
213    port_name: &str,
214    pool: std::sync::Arc<ad_core_rs::ndarray_pool::NDArrayPool>,
215    queue_size: usize,
216    ndarray_port: &str,
217    wiring: std::sync::Arc<ad_core_rs::plugin::wiring::WiringRegistry>,
218    ts_registry: &crate::time_series::TsReceiverRegistry,
219) -> (
220    ad_core_rs::plugin::runtime::PluginRuntimeHandle,
221    std::thread::JoinHandle<()>,
222) {
223    let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);
224
225    let mut processor = AttributeProcessor::new("");
226    processor.set_ts_sender(ts_tx);
227
228    let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime_multi_addr(
229        port_name,
230        processor,
231        pool,
232        queue_size,
233        ndarray_port,
234        wiring,
235        MAX_ATTR_CHANNELS,
236    );
237
238    let channel_names: Vec<String> = attr_ts_channel_names()
239        .iter()
240        .map(|s| s.to_string())
241        .collect();
242    ts_registry.store(port_name, ts_rx, channel_names);
243
244    (handle, data_jh)
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
251    use ad_core_rs::ndarray::{NDDataType, NDDimension};
252
253    fn make_array_with_attr(name: &str, value: f64, uid: i32) -> NDArray {
254        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
255        arr.unique_id = uid;
256        arr.attributes.add(NDAttribute {
257            name: name.to_string(),
258            description: String::new(),
259            source: NDAttrSource::Driver,
260            value: NDAttrValue::Float64(value),
261        });
262        arr
263    }
264
265    #[test]
266    fn test_extract_named_attribute() {
267        let mut proc = AttributeProcessor::new("Temperature");
268        let pool = NDArrayPool::new(1_000_000);
269
270        let arr = make_array_with_attr("Temperature", 25.5, 1);
271        let result = proc.process_array(&arr, &pool);
272
273        assert!(
274            result.output_arrays.is_empty(),
275            "attribute plugin is a sink"
276        );
277        assert!((proc.value() - 25.5).abs() < 1e-10);
278        assert!((proc.value_sum() - 25.5).abs() < 1e-10);
279    }
280
281    #[test]
282    fn test_sum_accumulation() {
283        let mut proc = AttributeProcessor::new("Intensity");
284        let pool = NDArrayPool::new(1_000_000);
285
286        let arr1 = make_array_with_attr("Intensity", 10.0, 1);
287        proc.process_array(&arr1, &pool);
288        assert!((proc.value_sum() - 10.0).abs() < 1e-10);
289
290        let arr2 = make_array_with_attr("Intensity", 20.0, 2);
291        proc.process_array(&arr2, &pool);
292        assert!((proc.value() - 20.0).abs() < 1e-10);
293        assert!((proc.value_sum() - 30.0).abs() < 1e-10);
294    }
295
296    #[test]
297    fn test_reset() {
298        let mut proc = AttributeProcessor::new("Count");
299        let pool = NDArrayPool::new(1_000_000);
300
301        let arr1 = make_array_with_attr("Count", 100.0, 1);
302        proc.process_array(&arr1, &pool);
303        assert!((proc.value_sum() - 100.0).abs() < 1e-10);
304
305        proc.reset();
306        assert!((proc.value_sum() - 0.0).abs() < 1e-10);
307        assert!((proc.value() - 0.0).abs() < 1e-10);
308    }
309
310    #[test]
311    fn test_special_attr_unique_id() {
312        let mut proc = AttributeProcessor::new("NDArrayUniqueId");
313        let pool = NDArrayPool::new(1_000_000);
314
315        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
316        arr.unique_id = 42;
317
318        proc.process_array(&arr, &pool);
319        assert!((proc.value() - 42.0).abs() < 1e-10);
320    }
321
322    #[test]
323    fn test_special_attr_timestamp() {
324        let mut proc = AttributeProcessor::new("NDArrayTimeStamp");
325        let pool = NDArrayPool::new(1_000_000);
326
327        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
328        arr.timestamp = ad_core_rs::timestamp::EpicsTimestamp {
329            sec: 100,
330            nsec: 500_000_000,
331        };
332
333        proc.process_array(&arr, &pool);
334        assert!((proc.value() - 100.5).abs() < 1e-9);
335    }
336
337    #[test]
338    fn test_missing_attribute() {
339        let mut proc = AttributeProcessor::new("NonExistent");
340        let pool = NDArrayPool::new(1_000_000);
341
342        let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
343        proc.process_array(&arr, &pool);
344
345        assert!((proc.value() - 0.0).abs() < 1e-10);
346        assert!((proc.value_sum() - 0.0).abs() < 1e-10);
347    }
348
349    #[test]
350    fn test_string_attribute_ignored() {
351        let mut proc = AttributeProcessor::new("Label");
352        let pool = NDArrayPool::new(1_000_000);
353
354        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
355        arr.attributes.add(NDAttribute {
356            name: "Label".to_string(),
357            description: String::new(),
358            source: NDAttrSource::Driver,
359            value: NDAttrValue::String("hello".to_string()),
360        });
361
362        proc.process_array(&arr, &pool);
363        assert!((proc.value() - 0.0).abs() < 1e-10);
364    }
365
366    #[test]
367    fn test_int32_attribute() {
368        let mut proc = AttributeProcessor::new("Counter");
369        let pool = NDArrayPool::new(1_000_000);
370
371        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
372        arr.attributes.add(NDAttribute {
373            name: "Counter".to_string(),
374            description: String::new(),
375            source: NDAttrSource::Driver,
376            value: NDAttrValue::Int32(7),
377        });
378
379        proc.process_array(&arr, &pool);
380        assert!((proc.value() - 7.0).abs() < 1e-10);
381    }
382
383    #[test]
384    fn test_set_attr_name() {
385        let mut proc = AttributeProcessor::new("A");
386        assert_eq!(proc.attr_name(), "A");
387
388        proc.set_attr_name("B");
389        assert_eq!(proc.attr_name(), "B");
390
391        let pool = NDArrayPool::new(1_000_000);
392        let arr = make_array_with_attr("B", 99.0, 1);
393        proc.process_array(&arr, &pool);
394        assert!((proc.value() - 99.0).abs() < 1e-10);
395    }
396}