use ad_core_rs::ndarray::NDArray;
use ad_core_rs::ndarray_pool::NDArrayPool;
use ad_core_rs::plugin::runtime::{
NDPluginProcess, ParamChangeResult, ParamChangeValue, ParamUpdate, PluginParamSnapshot,
ProcessResult,
};
use asyn_rs::error::AsynError;
use asyn_rs::param::ParamType;
use asyn_rs::port::PortDriverBase;
use crate::time_series::{TimeSeriesData, TimeSeriesSender};
const MAX_ATTR_CHANNELS: usize = 8;
#[derive(Clone, Copy, Default)]
pub struct AttributeParams {
pub attr_name: usize,
pub value: usize,
pub value_sum: usize,
pub reset: usize,
}
#[derive(Clone)]
struct AttrChannel {
name: String,
value: f64,
value_sum: f64,
}
impl Default for AttrChannel {
fn default() -> Self {
Self {
name: String::new(),
value: 0.0,
value_sum: 0.0,
}
}
}
impl AttrChannel {
fn extract_value(&self, array: &NDArray) -> Option<f64> {
if self.name.is_empty() {
return None;
}
match self.name.as_str() {
"NDArrayUniqueId" => Some(array.unique_id as f64),
"NDArrayTimeStamp" => Some(array.timestamp.as_f64()),
_ => array
.attributes
.get(&self.name)
.and_then(|attr| attr.value.as_f64()),
}
}
}
pub struct AttributeProcessor {
channels: [AttrChannel; MAX_ATTR_CHANNELS],
params: AttributeParams,
ts_sender: Option<TimeSeriesSender>,
}
impl AttributeProcessor {
pub fn new(attr_name: &str) -> Self {
let mut channels: [AttrChannel; MAX_ATTR_CHANNELS] = Default::default();
channels[0].name = attr_name.to_string();
Self {
channels,
params: AttributeParams::default(),
ts_sender: None,
}
}
pub fn set_ts_sender(&mut self, sender: TimeSeriesSender) {
self.ts_sender = Some(sender);
}
pub fn params(&self) -> &AttributeParams {
&self.params
}
pub fn reset(&mut self, addr: usize) {
if addr < MAX_ATTR_CHANNELS {
self.channels[addr].value_sum = 0.0;
}
}
pub fn value(&self) -> f64 {
self.channels[0].value
}
pub fn value_sum(&self) -> f64 {
self.channels[0].value_sum
}
pub fn attr_name(&self) -> &str {
&self.channels[0].name
}
pub fn set_attr_name(&mut self, name: &str) {
self.channels[0].name = name.to_string();
}
}
impl NDPluginProcess for AttributeProcessor {
fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
let mut updates = Vec::new();
for (i, ch) in self.channels.iter_mut().enumerate() {
if ch.name.is_empty() {
continue;
}
if let Some(val) = ch.extract_value(array) {
ch.value = val;
ch.value_sum += val;
}
let addr = i as i32;
updates.push(ParamUpdate::float64_addr(self.params.value, addr, ch.value));
updates.push(ParamUpdate::float64_addr(
self.params.value_sum,
addr,
ch.value_sum,
));
}
if let Some(ref sender) = self.ts_sender {
let values: Vec<f64> = self.channels.iter().map(|ch| ch.value).collect();
let _ = sender.try_send(TimeSeriesData { values });
}
ProcessResult::sink(updates)
}
fn plugin_type(&self) -> &str {
"NDPluginAttribute"
}
fn register_params(&mut self, base: &mut PortDriverBase) -> Result<(), AsynError> {
self.params.attr_name = base.create_param("ATTR_NAME", ParamType::Octet)?;
self.params.value = base.create_param("ATTR_VAL", ParamType::Float64)?;
self.params.value_sum = base.create_param("ATTR_VAL_SUM", ParamType::Float64)?;
self.params.reset = base.create_param("ATTR_RESET", ParamType::Int32)?;
Ok(())
}
fn on_param_change(
&mut self,
reason: usize,
params: &PluginParamSnapshot,
) -> ParamChangeResult {
let addr = params.addr as usize;
if reason == self.params.attr_name {
if addr < MAX_ATTR_CHANNELS {
if let ParamChangeValue::Octet(s) = ¶ms.value {
self.channels[addr].name = s.clone();
}
}
} else if reason == self.params.reset {
if params.value.as_i32() != 0 {
if addr < MAX_ATTR_CHANNELS {
self.channels[addr].value_sum = 0.0;
}
}
}
ParamChangeResult::updates(vec![])
}
}
pub fn attr_ts_channel_names() -> Vec<&'static str> {
vec![
"TSArrayValue",
"TSArrayValue1",
"TSArrayValue2",
"TSArrayValue3",
"TSArrayValue4",
"TSArrayValue5",
"TSArrayValue6",
"TSArrayValue7",
]
}
pub fn create_attribute_runtime(
port_name: &str,
pool: std::sync::Arc<ad_core_rs::ndarray_pool::NDArrayPool>,
queue_size: usize,
ndarray_port: &str,
wiring: std::sync::Arc<ad_core_rs::plugin::wiring::WiringRegistry>,
ts_registry: &crate::time_series::TsReceiverRegistry,
) -> (
ad_core_rs::plugin::runtime::PluginRuntimeHandle,
std::thread::JoinHandle<()>,
) {
let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);
let mut processor = AttributeProcessor::new("");
processor.set_ts_sender(ts_tx);
let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime_multi_addr(
port_name,
processor,
pool,
queue_size,
ndarray_port,
wiring,
MAX_ATTR_CHANNELS,
);
let channel_names: Vec<String> = attr_ts_channel_names()
.iter()
.map(|s| s.to_string())
.collect();
ts_registry.store(port_name, ts_rx, channel_names);
(handle, data_jh)
}
#[cfg(test)]
mod tests {
use super::*;
use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
use ad_core_rs::ndarray::{NDDataType, NDDimension};
fn make_array_with_attr(name: &str, value: f64, uid: i32) -> NDArray {
let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr.unique_id = uid;
arr.attributes.add(NDAttribute {
name: name.to_string(),
description: String::new(),
source: NDAttrSource::Driver,
value: NDAttrValue::Float64(value),
});
arr
}
#[test]
fn test_extract_named_attribute() {
let mut proc = AttributeProcessor::new("Temperature");
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attr("Temperature", 25.5, 1);
let result = proc.process_array(&arr, &pool);
assert!(
result.output_arrays.is_empty(),
"attribute plugin is a sink"
);
assert!((proc.value() - 25.5).abs() < 1e-10);
assert!((proc.value_sum() - 25.5).abs() < 1e-10);
}
#[test]
fn test_sum_accumulation() {
let mut proc = AttributeProcessor::new("Intensity");
let pool = NDArrayPool::new(1_000_000);
let arr1 = make_array_with_attr("Intensity", 10.0, 1);
proc.process_array(&arr1, &pool);
assert!((proc.value_sum() - 10.0).abs() < 1e-10);
let arr2 = make_array_with_attr("Intensity", 20.0, 2);
proc.process_array(&arr2, &pool);
assert!((proc.value() - 20.0).abs() < 1e-10);
assert!((proc.value_sum() - 30.0).abs() < 1e-10);
}
#[test]
fn test_reset() {
let mut proc = AttributeProcessor::new("Count");
let pool = NDArrayPool::new(1_000_000);
let arr1 = make_array_with_attr("Count", 100.0, 1);
proc.process_array(&arr1, &pool);
assert!((proc.value_sum() - 100.0).abs() < 1e-10);
proc.reset(0);
assert!((proc.value_sum() - 0.0).abs() < 1e-10);
assert!((proc.value() - 100.0).abs() < 1e-10);
}
#[test]
fn test_special_attr_unique_id() {
let mut proc = AttributeProcessor::new("NDArrayUniqueId");
let pool = NDArrayPool::new(1_000_000);
let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr.unique_id = 42;
proc.process_array(&arr, &pool);
assert!((proc.value() - 42.0).abs() < 1e-10);
}
#[test]
fn test_special_attr_timestamp() {
let mut proc = AttributeProcessor::new("NDArrayTimeStamp");
let pool = NDArrayPool::new(1_000_000);
let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr.timestamp = ad_core_rs::timestamp::EpicsTimestamp {
sec: 100,
nsec: 500_000_000,
};
proc.process_array(&arr, &pool);
assert!((proc.value() - 100.5).abs() < 1e-9);
}
#[test]
fn test_missing_attribute() {
let mut proc = AttributeProcessor::new("NonExistent");
let pool = NDArrayPool::new(1_000_000);
let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
proc.process_array(&arr, &pool);
assert!((proc.value() - 0.0).abs() < 1e-10);
assert!((proc.value_sum() - 0.0).abs() < 1e-10);
}
#[test]
fn test_string_attribute_ignored() {
let mut proc = AttributeProcessor::new("Label");
let pool = NDArrayPool::new(1_000_000);
let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr.attributes.add(NDAttribute {
name: "Label".to_string(),
description: String::new(),
source: NDAttrSource::Driver,
value: NDAttrValue::String("hello".to_string()),
});
proc.process_array(&arr, &pool);
assert!((proc.value() - 0.0).abs() < 1e-10);
}
#[test]
fn test_int32_attribute() {
let mut proc = AttributeProcessor::new("Counter");
let pool = NDArrayPool::new(1_000_000);
let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr.attributes.add(NDAttribute {
name: "Counter".to_string(),
description: String::new(),
source: NDAttrSource::Driver,
value: NDAttrValue::Int32(7),
});
proc.process_array(&arr, &pool);
assert!((proc.value() - 7.0).abs() < 1e-10);
}
#[test]
fn test_set_attr_name() {
let mut proc = AttributeProcessor::new("A");
assert_eq!(proc.attr_name(), "A");
proc.set_attr_name("B");
assert_eq!(proc.attr_name(), "B");
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attr("B", 99.0, 1);
proc.process_array(&arr, &pool);
assert!((proc.value() - 99.0).abs() < 1e-10);
}
}