1use ad_core_rs::ndarray::NDArray;
8use ad_core_rs::ndarray_pool::NDArrayPool;
9use ad_core_rs::plugin::runtime::{
10 NDPluginProcess, ParamChangeResult, ParamChangeValue, ParamUpdate, PluginParamSnapshot,
11 ProcessResult,
12};
13use asyn_rs::error::AsynError;
14use asyn_rs::param::ParamType;
15use asyn_rs::port::PortDriverBase;
16
17use crate::time_series::{TimeSeriesData, TimeSeriesSender};
18
19const MAX_ATTR_CHANNELS: usize = 8;
21
22#[derive(Clone, Copy, Default)]
24pub struct AttributeParams {
25 pub attr_name: usize,
26 pub value: usize,
27 pub value_sum: usize,
28 pub reset: usize,
29}
30
31#[derive(Clone)]
33struct AttrChannel {
34 name: String,
35 value: f64,
36 value_sum: f64,
37}
38
39impl Default for AttrChannel {
40 fn default() -> Self {
41 Self {
42 name: String::new(),
43 value: 0.0,
44 value_sum: 0.0,
45 }
46 }
47}
48
49impl AttrChannel {
50 fn extract_value(&self, array: &NDArray) -> Option<f64> {
51 if self.name.is_empty() {
52 return None;
53 }
54 match self.name.as_str() {
55 "NDArrayUniqueId" => Some(array.unique_id as f64),
56 "NDArrayTimeStamp" => Some(array.timestamp.as_f64()),
57 _ => array
58 .attributes
59 .get(&self.name)
60 .and_then(|attr| attr.value.as_f64()),
61 }
62 }
63}
64
65pub struct AttributeProcessor {
67 channels: [AttrChannel; MAX_ATTR_CHANNELS],
68 params: AttributeParams,
69 ts_sender: Option<TimeSeriesSender>,
70}
71
72impl AttributeProcessor {
73 pub fn new(attr_name: &str) -> Self {
74 let mut channels: [AttrChannel; MAX_ATTR_CHANNELS] = Default::default();
75 channels[0].name = attr_name.to_string();
76 Self {
77 channels,
78 params: AttributeParams::default(),
79 ts_sender: None,
80 }
81 }
82
83 pub fn set_ts_sender(&mut self, sender: TimeSeriesSender) {
84 self.ts_sender = Some(sender);
85 }
86
87 pub fn params(&self) -> &AttributeParams {
89 &self.params
90 }
91
92 pub fn reset(&mut self, addr: usize) {
94 if addr < MAX_ATTR_CHANNELS {
95 self.channels[addr].value_sum = 0.0;
96 }
97 }
98
99 pub fn value(&self) -> f64 {
101 self.channels[0].value
102 }
103
104 pub fn value_sum(&self) -> f64 {
106 self.channels[0].value_sum
107 }
108
109 pub fn attr_name(&self) -> &str {
111 &self.channels[0].name
112 }
113
114 pub fn set_attr_name(&mut self, name: &str) {
116 self.channels[0].name = name.to_string();
117 }
118}
119
120impl NDPluginProcess for AttributeProcessor {
121 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
122 let mut updates = Vec::new();
123
124 for (i, ch) in self.channels.iter_mut().enumerate() {
125 if ch.name.is_empty() {
126 continue;
127 }
128 if let Some(val) = ch.extract_value(array) {
129 ch.value = val;
130 ch.value_sum += val;
131 }
132 let addr = i as i32;
133 updates.push(ParamUpdate::float64_addr(self.params.value, addr, ch.value));
134 updates.push(ParamUpdate::float64_addr(
135 self.params.value_sum,
136 addr,
137 ch.value_sum,
138 ));
139 }
140
141 if let Some(ref sender) = self.ts_sender {
143 let values: Vec<f64> = self.channels.iter().map(|ch| ch.value).collect();
144 let _ = sender.try_send(TimeSeriesData { values });
145 }
146
147 ProcessResult::sink(updates)
148 }
149
150 fn plugin_type(&self) -> &str {
151 "NDPluginAttribute"
152 }
153
154 fn register_params(&mut self, base: &mut PortDriverBase) -> Result<(), AsynError> {
155 self.params.attr_name = base.create_param("ATTR_NAME", ParamType::Octet)?;
156 self.params.value = base.create_param("ATTR_VAL", ParamType::Float64)?;
157 self.params.value_sum = base.create_param("ATTR_VAL_SUM", ParamType::Float64)?;
158 self.params.reset = base.create_param("ATTR_RESET", ParamType::Int32)?;
159 Ok(())
160 }
161
162 fn on_param_change(
163 &mut self,
164 reason: usize,
165 params: &PluginParamSnapshot,
166 ) -> ParamChangeResult {
167 let addr = params.addr as usize;
168
169 if reason == self.params.attr_name {
170 if addr < MAX_ATTR_CHANNELS {
171 if let ParamChangeValue::Octet(s) = ¶ms.value {
172 self.channels[addr].name = s.clone();
173 }
174 }
175 } else if reason == self.params.reset {
176 if params.value.as_i32() != 0 {
177 if addr < MAX_ATTR_CHANNELS {
178 self.channels[addr].value_sum = 0.0;
179 }
180 }
181 }
182
183 ParamChangeResult::updates(vec![])
184 }
185}
186
187pub fn attr_ts_channel_names() -> Vec<&'static str> {
189 vec![
190 "TSArrayValue",
191 "TSArrayValue1",
192 "TSArrayValue2",
193 "TSArrayValue3",
194 "TSArrayValue4",
195 "TSArrayValue5",
196 "TSArrayValue6",
197 "TSArrayValue7",
198 ]
199}
200
201pub fn create_attribute_runtime(
204 port_name: &str,
205 pool: std::sync::Arc<ad_core_rs::ndarray_pool::NDArrayPool>,
206 queue_size: usize,
207 ndarray_port: &str,
208 wiring: std::sync::Arc<ad_core_rs::plugin::wiring::WiringRegistry>,
209 ts_registry: &crate::time_series::TsReceiverRegistry,
210) -> (
211 ad_core_rs::plugin::runtime::PluginRuntimeHandle,
212 std::thread::JoinHandle<()>,
213) {
214 let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);
215
216 let mut processor = AttributeProcessor::new("");
217 processor.set_ts_sender(ts_tx);
218
219 let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime_multi_addr(
220 port_name,
221 processor,
222 pool,
223 queue_size,
224 ndarray_port,
225 wiring,
226 MAX_ATTR_CHANNELS,
227 );
228
229 let channel_names: Vec<String> = attr_ts_channel_names()
230 .iter()
231 .map(|s| s.to_string())
232 .collect();
233 ts_registry.store(port_name, ts_rx, channel_names);
234
235 (handle, data_jh)
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
242 use ad_core_rs::ndarray::{NDDataType, NDDimension};
243
244 fn make_array_with_attr(name: &str, value: f64, uid: i32) -> NDArray {
245 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
246 arr.unique_id = uid;
247 arr.attributes.add(NDAttribute {
248 name: name.to_string(),
249 description: String::new(),
250 source: NDAttrSource::Driver,
251 value: NDAttrValue::Float64(value),
252 });
253 arr
254 }
255
256 #[test]
257 fn test_extract_named_attribute() {
258 let mut proc = AttributeProcessor::new("Temperature");
259 let pool = NDArrayPool::new(1_000_000);
260
261 let arr = make_array_with_attr("Temperature", 25.5, 1);
262 let result = proc.process_array(&arr, &pool);
263
264 assert!(
265 result.output_arrays.is_empty(),
266 "attribute plugin is a sink"
267 );
268 assert!((proc.value() - 25.5).abs() < 1e-10);
269 assert!((proc.value_sum() - 25.5).abs() < 1e-10);
270 }
271
272 #[test]
273 fn test_sum_accumulation() {
274 let mut proc = AttributeProcessor::new("Intensity");
275 let pool = NDArrayPool::new(1_000_000);
276
277 let arr1 = make_array_with_attr("Intensity", 10.0, 1);
278 proc.process_array(&arr1, &pool);
279 assert!((proc.value_sum() - 10.0).abs() < 1e-10);
280
281 let arr2 = make_array_with_attr("Intensity", 20.0, 2);
282 proc.process_array(&arr2, &pool);
283 assert!((proc.value() - 20.0).abs() < 1e-10);
284 assert!((proc.value_sum() - 30.0).abs() < 1e-10);
285 }
286
287 #[test]
288 fn test_reset() {
289 let mut proc = AttributeProcessor::new("Count");
290 let pool = NDArrayPool::new(1_000_000);
291
292 let arr1 = make_array_with_attr("Count", 100.0, 1);
293 proc.process_array(&arr1, &pool);
294 assert!((proc.value_sum() - 100.0).abs() < 1e-10);
295
296 proc.reset(0);
297 assert!((proc.value_sum() - 0.0).abs() < 1e-10);
298 assert!((proc.value() - 100.0).abs() < 1e-10);
299 }
300
301 #[test]
302 fn test_special_attr_unique_id() {
303 let mut proc = AttributeProcessor::new("NDArrayUniqueId");
304 let pool = NDArrayPool::new(1_000_000);
305
306 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
307 arr.unique_id = 42;
308
309 proc.process_array(&arr, &pool);
310 assert!((proc.value() - 42.0).abs() < 1e-10);
311 }
312
313 #[test]
314 fn test_special_attr_timestamp() {
315 let mut proc = AttributeProcessor::new("NDArrayTimeStamp");
316 let pool = NDArrayPool::new(1_000_000);
317
318 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
319 arr.timestamp = ad_core_rs::timestamp::EpicsTimestamp {
320 sec: 100,
321 nsec: 500_000_000,
322 };
323
324 proc.process_array(&arr, &pool);
325 assert!((proc.value() - 100.5).abs() < 1e-9);
326 }
327
328 #[test]
329 fn test_missing_attribute() {
330 let mut proc = AttributeProcessor::new("NonExistent");
331 let pool = NDArrayPool::new(1_000_000);
332
333 let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
334 proc.process_array(&arr, &pool);
335
336 assert!((proc.value() - 0.0).abs() < 1e-10);
337 assert!((proc.value_sum() - 0.0).abs() < 1e-10);
338 }
339
340 #[test]
341 fn test_string_attribute_ignored() {
342 let mut proc = AttributeProcessor::new("Label");
343 let pool = NDArrayPool::new(1_000_000);
344
345 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
346 arr.attributes.add(NDAttribute {
347 name: "Label".to_string(),
348 description: String::new(),
349 source: NDAttrSource::Driver,
350 value: NDAttrValue::String("hello".to_string()),
351 });
352
353 proc.process_array(&arr, &pool);
354 assert!((proc.value() - 0.0).abs() < 1e-10);
355 }
356
357 #[test]
358 fn test_int32_attribute() {
359 let mut proc = AttributeProcessor::new("Counter");
360 let pool = NDArrayPool::new(1_000_000);
361
362 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
363 arr.attributes.add(NDAttribute {
364 name: "Counter".to_string(),
365 description: String::new(),
366 source: NDAttrSource::Driver,
367 value: NDAttrValue::Int32(7),
368 });
369
370 proc.process_array(&arr, &pool);
371 assert!((proc.value() - 7.0).abs() < 1e-10);
372 }
373
374 #[test]
375 fn test_set_attr_name() {
376 let mut proc = AttributeProcessor::new("A");
377 assert_eq!(proc.attr_name(), "A");
378
379 proc.set_attr_name("B");
380 assert_eq!(proc.attr_name(), "B");
381
382 let pool = NDArrayPool::new(1_000_000);
383 let arr = make_array_with_attr("B", 99.0, 1);
384 proc.process_array(&arr, &pool);
385 assert!((proc.value() - 99.0).abs() < 1e-10);
386 }
387}