Skip to main content

ad_plugins_rs/
attribute.rs

1//! NDPluginAttribute: extracts named attribute values from each array.
2//!
3//! Supports up to 8 attribute channels (addr 0..7), each tracking a different
4//! attribute by name. Special pseudo-attribute names "NDArrayUniqueId" and
5//! "NDArrayTimeStamp" read from the array header.
6
7use ad_core_rs::ndarray::NDArray;
8use ad_core_rs::ndarray_pool::NDArrayPool;
9use ad_core_rs::plugin::runtime::{
10    NDPluginProcess, ParamChangeResult, ParamChangeValue, ParamUpdate, PluginParamSnapshot,
11    ProcessResult,
12};
13use asyn_rs::error::AsynError;
14use asyn_rs::param::ParamType;
15use asyn_rs::port::PortDriverBase;
16
17use crate::time_series::{TimeSeriesData, TimeSeriesSender};
18
19/// Maximum number of attribute channels.
20const MAX_ATTR_CHANNELS: usize = 8;
21
22/// Parameter indices for NDPluginAttribute.
23#[derive(Clone, Copy, Default)]
24pub struct AttributeParams {
25    pub attr_name: usize,
26    pub value: usize,
27    pub value_sum: usize,
28    pub reset: usize,
29}
30
31/// State for a single attribute channel.
32#[derive(Clone)]
33struct AttrChannel {
34    name: String,
35    value: f64,
36    value_sum: f64,
37}
38
39impl Default for AttrChannel {
40    fn default() -> Self {
41        Self {
42            name: String::new(),
43            value: 0.0,
44            value_sum: 0.0,
45        }
46    }
47}
48
49impl AttrChannel {
50    fn extract_value(&self, array: &NDArray) -> Option<f64> {
51        if self.name.is_empty() {
52            return None;
53        }
54        match self.name.as_str() {
55            "NDArrayUniqueId" => Some(array.unique_id as f64),
56            "NDArrayTimeStamp" => Some(array.timestamp.as_f64()),
57            _ => array
58                .attributes
59                .get(&self.name)
60                .and_then(|attr| attr.value.as_f64()),
61        }
62    }
63}
64
65/// Processor that extracts multiple attribute values from each array.
66pub struct AttributeProcessor {
67    channels: [AttrChannel; MAX_ATTR_CHANNELS],
68    params: AttributeParams,
69    ts_sender: Option<TimeSeriesSender>,
70}
71
72impl AttributeProcessor {
73    pub fn new(attr_name: &str) -> Self {
74        let mut channels: [AttrChannel; MAX_ATTR_CHANNELS] = Default::default();
75        channels[0].name = attr_name.to_string();
76        Self {
77            channels,
78            params: AttributeParams::default(),
79            ts_sender: None,
80        }
81    }
82
83    pub fn set_ts_sender(&mut self, sender: TimeSeriesSender) {
84        self.ts_sender = Some(sender);
85    }
86
87    /// Access the registered param indices (populated after register_params).
88    pub fn params(&self) -> &AttributeParams {
89        &self.params
90    }
91
92    /// Reset the accumulated sum for a channel.
93    pub fn reset(&mut self, addr: usize) {
94        if addr < MAX_ATTR_CHANNELS {
95            self.channels[addr].value_sum = 0.0;
96        }
97    }
98
99    /// Current extracted value for channel 0.
100    pub fn value(&self) -> f64 {
101        self.channels[0].value
102    }
103
104    /// Current accumulated sum for channel 0.
105    pub fn value_sum(&self) -> f64 {
106        self.channels[0].value_sum
107    }
108
109    /// The attribute name being tracked by channel 0.
110    pub fn attr_name(&self) -> &str {
111        &self.channels[0].name
112    }
113
114    /// Set the attribute name for channel 0.
115    pub fn set_attr_name(&mut self, name: &str) {
116        self.channels[0].name = name.to_string();
117    }
118}
119
120impl NDPluginProcess for AttributeProcessor {
121    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
122        let mut updates = Vec::new();
123
124        for (i, ch) in self.channels.iter_mut().enumerate() {
125            if ch.name.is_empty() {
126                continue;
127            }
128            if let Some(val) = ch.extract_value(array) {
129                ch.value = val;
130                ch.value_sum += val;
131            }
132            let addr = i as i32;
133            updates.push(ParamUpdate::float64_addr(self.params.value, addr, ch.value));
134            updates.push(ParamUpdate::float64_addr(
135                self.params.value_sum,
136                addr,
137                ch.value_sum,
138            ));
139        }
140
141        // Send to time series
142        if let Some(ref sender) = self.ts_sender {
143            let values: Vec<f64> = self.channels.iter().map(|ch| ch.value).collect();
144            let _ = sender.try_send(TimeSeriesData { values });
145        }
146
147        ProcessResult::sink(updates)
148    }
149
150    fn plugin_type(&self) -> &str {
151        "NDPluginAttribute"
152    }
153
154    fn register_params(&mut self, base: &mut PortDriverBase) -> Result<(), AsynError> {
155        self.params.attr_name = base.create_param("ATTR_NAME", ParamType::Octet)?;
156        self.params.value = base.create_param("ATTR_VAL", ParamType::Float64)?;
157        self.params.value_sum = base.create_param("ATTR_VAL_SUM", ParamType::Float64)?;
158        self.params.reset = base.create_param("ATTR_RESET", ParamType::Int32)?;
159        Ok(())
160    }
161
162    fn on_param_change(
163        &mut self,
164        reason: usize,
165        params: &PluginParamSnapshot,
166    ) -> ParamChangeResult {
167        let addr = params.addr as usize;
168
169        if reason == self.params.attr_name {
170            if addr < MAX_ATTR_CHANNELS {
171                if let ParamChangeValue::Octet(s) = &params.value {
172                    self.channels[addr].name = s.clone();
173                }
174            }
175        } else if reason == self.params.reset {
176            if params.value.as_i32() != 0 {
177                if addr < MAX_ATTR_CHANNELS {
178                    self.channels[addr].value_sum = 0.0;
179                }
180            }
181        }
182
183        ParamChangeResult::updates(vec![])
184    }
185}
186
187/// Channel names for time series (one per attribute channel).
188pub fn attr_ts_channel_names() -> Vec<&'static str> {
189    vec![
190        "TSArrayValue",
191        "TSArrayValue1",
192        "TSArrayValue2",
193        "TSArrayValue3",
194        "TSArrayValue4",
195        "TSArrayValue5",
196        "TSArrayValue6",
197        "TSArrayValue7",
198    ]
199}
200
201/// Create an Attribute plugin runtime. The TS receiver is stored in the registry
202/// for later pickup by `NDTimeSeriesConfigure`.
203pub fn create_attribute_runtime(
204    port_name: &str,
205    pool: std::sync::Arc<ad_core_rs::ndarray_pool::NDArrayPool>,
206    queue_size: usize,
207    ndarray_port: &str,
208    wiring: std::sync::Arc<ad_core_rs::plugin::wiring::WiringRegistry>,
209    ts_registry: &crate::time_series::TsReceiverRegistry,
210) -> (
211    ad_core_rs::plugin::runtime::PluginRuntimeHandle,
212    std::thread::JoinHandle<()>,
213) {
214    let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);
215
216    let mut processor = AttributeProcessor::new("");
217    processor.set_ts_sender(ts_tx);
218
219    let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime_multi_addr(
220        port_name,
221        processor,
222        pool,
223        queue_size,
224        ndarray_port,
225        wiring,
226        MAX_ATTR_CHANNELS,
227    );
228
229    let channel_names: Vec<String> = attr_ts_channel_names()
230        .iter()
231        .map(|s| s.to_string())
232        .collect();
233    ts_registry.store(port_name, ts_rx, channel_names);
234
235    (handle, data_jh)
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
242    use ad_core_rs::ndarray::{NDDataType, NDDimension};
243
244    fn make_array_with_attr(name: &str, value: f64, uid: i32) -> NDArray {
245        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
246        arr.unique_id = uid;
247        arr.attributes.add(NDAttribute {
248            name: name.to_string(),
249            description: String::new(),
250            source: NDAttrSource::Driver,
251            value: NDAttrValue::Float64(value),
252        });
253        arr
254    }
255
256    #[test]
257    fn test_extract_named_attribute() {
258        let mut proc = AttributeProcessor::new("Temperature");
259        let pool = NDArrayPool::new(1_000_000);
260
261        let arr = make_array_with_attr("Temperature", 25.5, 1);
262        let result = proc.process_array(&arr, &pool);
263
264        assert!(
265            result.output_arrays.is_empty(),
266            "attribute plugin is a sink"
267        );
268        assert!((proc.value() - 25.5).abs() < 1e-10);
269        assert!((proc.value_sum() - 25.5).abs() < 1e-10);
270    }
271
272    #[test]
273    fn test_sum_accumulation() {
274        let mut proc = AttributeProcessor::new("Intensity");
275        let pool = NDArrayPool::new(1_000_000);
276
277        let arr1 = make_array_with_attr("Intensity", 10.0, 1);
278        proc.process_array(&arr1, &pool);
279        assert!((proc.value_sum() - 10.0).abs() < 1e-10);
280
281        let arr2 = make_array_with_attr("Intensity", 20.0, 2);
282        proc.process_array(&arr2, &pool);
283        assert!((proc.value() - 20.0).abs() < 1e-10);
284        assert!((proc.value_sum() - 30.0).abs() < 1e-10);
285    }
286
287    #[test]
288    fn test_reset() {
289        let mut proc = AttributeProcessor::new("Count");
290        let pool = NDArrayPool::new(1_000_000);
291
292        let arr1 = make_array_with_attr("Count", 100.0, 1);
293        proc.process_array(&arr1, &pool);
294        assert!((proc.value_sum() - 100.0).abs() < 1e-10);
295
296        proc.reset(0);
297        assert!((proc.value_sum() - 0.0).abs() < 1e-10);
298        assert!((proc.value() - 100.0).abs() < 1e-10);
299    }
300
301    #[test]
302    fn test_special_attr_unique_id() {
303        let mut proc = AttributeProcessor::new("NDArrayUniqueId");
304        let pool = NDArrayPool::new(1_000_000);
305
306        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
307        arr.unique_id = 42;
308
309        proc.process_array(&arr, &pool);
310        assert!((proc.value() - 42.0).abs() < 1e-10);
311    }
312
313    #[test]
314    fn test_special_attr_timestamp() {
315        let mut proc = AttributeProcessor::new("NDArrayTimeStamp");
316        let pool = NDArrayPool::new(1_000_000);
317
318        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
319        arr.timestamp = ad_core_rs::timestamp::EpicsTimestamp {
320            sec: 100,
321            nsec: 500_000_000,
322        };
323
324        proc.process_array(&arr, &pool);
325        assert!((proc.value() - 100.5).abs() < 1e-9);
326    }
327
328    #[test]
329    fn test_missing_attribute() {
330        let mut proc = AttributeProcessor::new("NonExistent");
331        let pool = NDArrayPool::new(1_000_000);
332
333        let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
334        proc.process_array(&arr, &pool);
335
336        assert!((proc.value() - 0.0).abs() < 1e-10);
337        assert!((proc.value_sum() - 0.0).abs() < 1e-10);
338    }
339
340    #[test]
341    fn test_string_attribute_ignored() {
342        let mut proc = AttributeProcessor::new("Label");
343        let pool = NDArrayPool::new(1_000_000);
344
345        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
346        arr.attributes.add(NDAttribute {
347            name: "Label".to_string(),
348            description: String::new(),
349            source: NDAttrSource::Driver,
350            value: NDAttrValue::String("hello".to_string()),
351        });
352
353        proc.process_array(&arr, &pool);
354        assert!((proc.value() - 0.0).abs() < 1e-10);
355    }
356
357    #[test]
358    fn test_int32_attribute() {
359        let mut proc = AttributeProcessor::new("Counter");
360        let pool = NDArrayPool::new(1_000_000);
361
362        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
363        arr.attributes.add(NDAttribute {
364            name: "Counter".to_string(),
365            description: String::new(),
366            source: NDAttrSource::Driver,
367            value: NDAttrValue::Int32(7),
368        });
369
370        proc.process_array(&arr, &pool);
371        assert!((proc.value() - 7.0).abs() < 1e-10);
372    }
373
374    #[test]
375    fn test_set_attr_name() {
376        let mut proc = AttributeProcessor::new("A");
377        assert_eq!(proc.attr_name(), "A");
378
379        proc.set_attr_name("B");
380        assert_eq!(proc.attr_name(), "B");
381
382        let pool = NDArrayPool::new(1_000_000);
383        let arr = make_array_with_attr("B", 99.0, 1);
384        proc.process_array(&arr, &pool);
385        assert!((proc.value() - 99.0).abs() < 1e-10);
386    }
387}