1use ad_core_rs::ndarray::NDArray;
8use ad_core_rs::ndarray_pool::NDArrayPool;
9use ad_core_rs::plugin::runtime::{
10 NDPluginProcess, ParamChangeResult, ParamChangeValue, ParamUpdate, PluginParamSnapshot,
11 ProcessResult,
12};
13use asyn_rs::error::AsynError;
14use asyn_rs::param::ParamType;
15use asyn_rs::port::PortDriverBase;
16
17use crate::time_series::{TimeSeriesData, TimeSeriesSender};
18
19const MAX_ATTR_CHANNELS: usize = 8;
21
22#[derive(Clone, Copy, Default)]
24pub struct AttributeParams {
25 pub attr_name: usize,
26 pub value: usize,
27 pub value_sum: usize,
28 pub reset: usize,
29}
30
31#[derive(Clone)]
33struct AttrChannel {
34 name: String,
35 value: f64,
36 value_sum: f64,
37}
38
39impl Default for AttrChannel {
40 fn default() -> Self {
41 Self {
42 name: String::new(),
43 value: 0.0,
44 value_sum: 0.0,
45 }
46 }
47}
48
49impl AttrChannel {
50 fn extract_value(&self, array: &NDArray) -> Option<f64> {
51 if self.name.is_empty() {
52 return None;
53 }
54 match self.name.as_str() {
55 "NDArrayUniqueId" => Some(array.unique_id as f64),
56 "NDArrayTimeStamp" => Some(array.timestamp.as_f64()),
57 "NDArrayEpicsTSSec" => Some(array.timestamp.sec as f64),
58 "NDArrayEpicsTSnSec" => Some(array.timestamp.nsec as f64),
59 _ => array
60 .attributes
61 .get(&self.name)
62 .and_then(|attr| attr.value.as_f64()),
63 }
64 }
65}
66
67pub struct AttributeProcessor {
69 channels: [AttrChannel; MAX_ATTR_CHANNELS],
70 params: AttributeParams,
71 ts_sender: Option<TimeSeriesSender>,
72}
73
74impl AttributeProcessor {
75 pub fn new(attr_name: &str) -> Self {
76 let mut channels: [AttrChannel; MAX_ATTR_CHANNELS] = Default::default();
77 channels[0].name = attr_name.to_string();
78 Self {
79 channels,
80 params: AttributeParams::default(),
81 ts_sender: None,
82 }
83 }
84
85 pub fn set_ts_sender(&mut self, sender: TimeSeriesSender) {
86 self.ts_sender = Some(sender);
87 }
88
89 pub fn params(&self) -> &AttributeParams {
91 &self.params
92 }
93
94 pub fn reset(&mut self) {
96 for ch in self.channels.iter_mut() {
97 ch.value = 0.0;
98 ch.value_sum = 0.0;
99 }
100 }
101
102 pub fn value(&self) -> f64 {
104 self.channels[0].value
105 }
106
107 pub fn value_sum(&self) -> f64 {
109 self.channels[0].value_sum
110 }
111
112 pub fn attr_name(&self) -> &str {
114 &self.channels[0].name
115 }
116
117 pub fn set_attr_name(&mut self, name: &str) {
119 self.channels[0].name = name.to_string();
120 }
121}
122
123impl NDPluginProcess for AttributeProcessor {
124 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
125 let mut updates = Vec::new();
126
127 for (i, ch) in self.channels.iter_mut().enumerate() {
128 if ch.name.is_empty() {
129 continue;
130 }
131 if let Some(val) = ch.extract_value(array) {
132 ch.value = val;
133 ch.value_sum += val;
134 }
135 let addr = i as i32;
136 updates.push(ParamUpdate::float64_addr(self.params.value, addr, ch.value));
137 updates.push(ParamUpdate::float64_addr(
138 self.params.value_sum,
139 addr,
140 ch.value_sum,
141 ));
142 }
143
144 if let Some(ref sender) = self.ts_sender {
146 let values: Vec<f64> = self.channels.iter().map(|ch| ch.value).collect();
147 let _ = sender.try_send(TimeSeriesData { values });
148 }
149
150 ProcessResult::sink(updates)
151 }
152
153 fn plugin_type(&self) -> &str {
154 "NDPluginAttribute"
155 }
156
157 fn register_params(&mut self, base: &mut PortDriverBase) -> Result<(), AsynError> {
158 self.params.attr_name = base.create_param("ATTR_ATTRNAME", ParamType::Octet)?;
159 self.params.value = base.create_param("ATTR_VAL", ParamType::Float64)?;
160 self.params.value_sum = base.create_param("ATTR_VAL_SUM", ParamType::Float64)?;
161 self.params.reset = base.create_param("ATTR_RESET", ParamType::Int32)?;
162 Ok(())
163 }
164
165 fn on_param_change(
166 &mut self,
167 reason: usize,
168 params: &PluginParamSnapshot,
169 ) -> ParamChangeResult {
170 let addr = params.addr as usize;
171
172 if reason == self.params.attr_name {
173 if addr < MAX_ATTR_CHANNELS {
174 if let ParamChangeValue::Octet(s) = ¶ms.value {
175 self.channels[addr].name = s.clone();
176 }
177 }
178 } else if reason == self.params.reset {
179 if params.value.as_i32() != 0 {
180 let mut updates = Vec::new();
181 for (i, ch) in self.channels.iter_mut().enumerate() {
182 ch.value = 0.0;
183 ch.value_sum = 0.0;
184 let a = i as i32;
185 updates.push(ParamUpdate::float64_addr(self.params.value, a, 0.0));
186 updates.push(ParamUpdate::float64_addr(self.params.value_sum, a, 0.0));
187 }
188 return ParamChangeResult::updates(updates);
189 }
190 }
191
192 ParamChangeResult::updates(vec![])
193 }
194}
195
196pub fn attr_ts_channel_names() -> Vec<&'static str> {
198 vec![
199 "TSArrayValue",
200 "TSArrayValue1",
201 "TSArrayValue2",
202 "TSArrayValue3",
203 "TSArrayValue4",
204 "TSArrayValue5",
205 "TSArrayValue6",
206 "TSArrayValue7",
207 ]
208}
209
210pub fn create_attribute_runtime(
213 port_name: &str,
214 pool: std::sync::Arc<ad_core_rs::ndarray_pool::NDArrayPool>,
215 queue_size: usize,
216 ndarray_port: &str,
217 wiring: std::sync::Arc<ad_core_rs::plugin::wiring::WiringRegistry>,
218 ts_registry: &crate::time_series::TsReceiverRegistry,
219) -> (
220 ad_core_rs::plugin::runtime::PluginRuntimeHandle,
221 std::thread::JoinHandle<()>,
222) {
223 let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);
224
225 let mut processor = AttributeProcessor::new("");
226 processor.set_ts_sender(ts_tx);
227
228 let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime_multi_addr(
229 port_name,
230 processor,
231 pool,
232 queue_size,
233 ndarray_port,
234 wiring,
235 MAX_ATTR_CHANNELS,
236 );
237
238 let channel_names: Vec<String> = attr_ts_channel_names()
239 .iter()
240 .map(|s| s.to_string())
241 .collect();
242 ts_registry.store(port_name, ts_rx, channel_names);
243
244 (handle, data_jh)
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
251 use ad_core_rs::ndarray::{NDDataType, NDDimension};
252
253 fn make_array_with_attr(name: &str, value: f64, uid: i32) -> NDArray {
254 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
255 arr.unique_id = uid;
256 arr.attributes.add(NDAttribute {
257 name: name.to_string(),
258 description: String::new(),
259 source: NDAttrSource::Driver,
260 value: NDAttrValue::Float64(value),
261 });
262 arr
263 }
264
265 #[test]
266 fn test_extract_named_attribute() {
267 let mut proc = AttributeProcessor::new("Temperature");
268 let pool = NDArrayPool::new(1_000_000);
269
270 let arr = make_array_with_attr("Temperature", 25.5, 1);
271 let result = proc.process_array(&arr, &pool);
272
273 assert!(
274 result.output_arrays.is_empty(),
275 "attribute plugin is a sink"
276 );
277 assert!((proc.value() - 25.5).abs() < 1e-10);
278 assert!((proc.value_sum() - 25.5).abs() < 1e-10);
279 }
280
281 #[test]
282 fn test_sum_accumulation() {
283 let mut proc = AttributeProcessor::new("Intensity");
284 let pool = NDArrayPool::new(1_000_000);
285
286 let arr1 = make_array_with_attr("Intensity", 10.0, 1);
287 proc.process_array(&arr1, &pool);
288 assert!((proc.value_sum() - 10.0).abs() < 1e-10);
289
290 let arr2 = make_array_with_attr("Intensity", 20.0, 2);
291 proc.process_array(&arr2, &pool);
292 assert!((proc.value() - 20.0).abs() < 1e-10);
293 assert!((proc.value_sum() - 30.0).abs() < 1e-10);
294 }
295
296 #[test]
297 fn test_reset() {
298 let mut proc = AttributeProcessor::new("Count");
299 let pool = NDArrayPool::new(1_000_000);
300
301 let arr1 = make_array_with_attr("Count", 100.0, 1);
302 proc.process_array(&arr1, &pool);
303 assert!((proc.value_sum() - 100.0).abs() < 1e-10);
304
305 proc.reset();
306 assert!((proc.value_sum() - 0.0).abs() < 1e-10);
307 assert!((proc.value() - 0.0).abs() < 1e-10);
308 }
309
310 #[test]
311 fn test_special_attr_unique_id() {
312 let mut proc = AttributeProcessor::new("NDArrayUniqueId");
313 let pool = NDArrayPool::new(1_000_000);
314
315 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
316 arr.unique_id = 42;
317
318 proc.process_array(&arr, &pool);
319 assert!((proc.value() - 42.0).abs() < 1e-10);
320 }
321
322 #[test]
323 fn test_special_attr_timestamp() {
324 let mut proc = AttributeProcessor::new("NDArrayTimeStamp");
325 let pool = NDArrayPool::new(1_000_000);
326
327 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
328 arr.timestamp = ad_core_rs::timestamp::EpicsTimestamp {
329 sec: 100,
330 nsec: 500_000_000,
331 };
332
333 proc.process_array(&arr, &pool);
334 assert!((proc.value() - 100.5).abs() < 1e-9);
335 }
336
337 #[test]
338 fn test_missing_attribute() {
339 let mut proc = AttributeProcessor::new("NonExistent");
340 let pool = NDArrayPool::new(1_000_000);
341
342 let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
343 proc.process_array(&arr, &pool);
344
345 assert!((proc.value() - 0.0).abs() < 1e-10);
346 assert!((proc.value_sum() - 0.0).abs() < 1e-10);
347 }
348
349 #[test]
350 fn test_string_attribute_ignored() {
351 let mut proc = AttributeProcessor::new("Label");
352 let pool = NDArrayPool::new(1_000_000);
353
354 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
355 arr.attributes.add(NDAttribute {
356 name: "Label".to_string(),
357 description: String::new(),
358 source: NDAttrSource::Driver,
359 value: NDAttrValue::String("hello".to_string()),
360 });
361
362 proc.process_array(&arr, &pool);
363 assert!((proc.value() - 0.0).abs() < 1e-10);
364 }
365
366 #[test]
367 fn test_int32_attribute() {
368 let mut proc = AttributeProcessor::new("Counter");
369 let pool = NDArrayPool::new(1_000_000);
370
371 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
372 arr.attributes.add(NDAttribute {
373 name: "Counter".to_string(),
374 description: String::new(),
375 source: NDAttrSource::Driver,
376 value: NDAttrValue::Int32(7),
377 });
378
379 proc.process_array(&arr, &pool);
380 assert!((proc.value() - 7.0).abs() < 1e-10);
381 }
382
383 #[test]
384 fn test_set_attr_name() {
385 let mut proc = AttributeProcessor::new("A");
386 assert_eq!(proc.attr_name(), "A");
387
388 proc.set_attr_name("B");
389 assert_eq!(proc.attr_name(), "B");
390
391 let pool = NDArrayPool::new(1_000_000);
392 let arr = make_array_with_attr("B", 99.0, 1);
393 proc.process_array(&arr, &pool);
394 assert!((proc.value() - 99.0).abs() < 1e-10);
395 }
396}