ad-plugins-rs 0.13.5

NDPlugin implementations for areaDetector-rs
Documentation
//! NDPluginAttribute: extracts named attribute values from each array.
//!
//! Supports up to 8 attribute channels (addr 0..7), each tracking a different
//! attribute by name. Special pseudo-attribute names "NDArrayUniqueId" and
//! "NDArrayTimeStamp" read from the array header.

use ad_core_rs::ndarray::NDArray;
use ad_core_rs::ndarray_pool::NDArrayPool;
use ad_core_rs::plugin::runtime::{
    NDPluginProcess, ParamChangeResult, ParamChangeValue, ParamUpdate, PluginParamSnapshot,
    ProcessResult,
};
use asyn_rs::error::AsynError;
use asyn_rs::param::ParamType;
use asyn_rs::port::PortDriverBase;

use crate::time_series::{TimeSeriesData, TimeSeriesSender};

/// Maximum number of attribute channels.
const MAX_ATTR_CHANNELS: usize = 8;

/// Parameter indices for NDPluginAttribute.
#[derive(Clone, Copy, Default)]
pub struct AttributeParams {
    pub attr_name: usize,
    pub value: usize,
    pub value_sum: usize,
    pub reset: usize,
}

/// State for a single attribute channel.
#[derive(Clone)]
struct AttrChannel {
    name: String,
    value: f64,
    value_sum: f64,
}

impl Default for AttrChannel {
    fn default() -> Self {
        Self {
            name: String::new(),
            value: 0.0,
            value_sum: 0.0,
        }
    }
}

impl AttrChannel {
    fn extract_value(&self, array: &NDArray) -> Option<f64> {
        if self.name.is_empty() {
            return None;
        }
        match self.name.as_str() {
            "NDArrayUniqueId" => Some(array.unique_id as f64),
            "NDArrayTimeStamp" => Some(array.timestamp.as_f64()),
            "NDArrayEpicsTSSec" => Some(array.timestamp.sec as f64),
            "NDArrayEpicsTSnSec" => Some(array.timestamp.nsec as f64),
            _ => array
                .attributes
                .get(&self.name)
                .and_then(|attr| attr.value.as_f64()),
        }
    }
}

/// Processor that extracts multiple attribute values from each array.
pub struct AttributeProcessor {
    channels: [AttrChannel; MAX_ATTR_CHANNELS],
    params: AttributeParams,
    ts_sender: Option<TimeSeriesSender>,
}

impl AttributeProcessor {
    pub fn new(attr_name: &str) -> Self {
        let mut channels: [AttrChannel; MAX_ATTR_CHANNELS] = Default::default();
        channels[0].name = attr_name.to_string();
        Self {
            channels,
            params: AttributeParams::default(),
            ts_sender: None,
        }
    }

    pub fn set_ts_sender(&mut self, sender: TimeSeriesSender) {
        self.ts_sender = Some(sender);
    }

    /// Access the registered param indices (populated after register_params).
    pub fn params(&self) -> &AttributeParams {
        &self.params
    }

    /// Reset value and value_sum for all channels (C parity: resets all, not just one).
    pub fn reset(&mut self) {
        for ch in self.channels.iter_mut() {
            ch.value = 0.0;
            ch.value_sum = 0.0;
        }
    }

    /// Current extracted value for channel 0.
    pub fn value(&self) -> f64 {
        self.channels[0].value
    }

    /// Current accumulated sum for channel 0.
    pub fn value_sum(&self) -> f64 {
        self.channels[0].value_sum
    }

    /// The attribute name being tracked by channel 0.
    pub fn attr_name(&self) -> &str {
        &self.channels[0].name
    }

    /// Set the attribute name for channel 0.
    pub fn set_attr_name(&mut self, name: &str) {
        self.channels[0].name = name.to_string();
    }
}

impl NDPluginProcess for AttributeProcessor {
    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
        let mut updates = Vec::new();

        for (i, ch) in self.channels.iter_mut().enumerate() {
            if ch.name.is_empty() {
                continue;
            }
            if let Some(val) = ch.extract_value(array) {
                ch.value = val;
                ch.value_sum += val;
            }
            let addr = i as i32;
            updates.push(ParamUpdate::float64_addr(self.params.value, addr, ch.value));
            updates.push(ParamUpdate::float64_addr(
                self.params.value_sum,
                addr,
                ch.value_sum,
            ));
        }

        // Send to time series
        if let Some(ref sender) = self.ts_sender {
            let values: Vec<f64> = self.channels.iter().map(|ch| ch.value).collect();
            let _ = sender.try_send(TimeSeriesData { values });
        }

        ProcessResult::sink(updates)
    }

    fn plugin_type(&self) -> &str {
        "NDPluginAttribute"
    }

    fn register_params(&mut self, base: &mut PortDriverBase) -> Result<(), AsynError> {
        self.params.attr_name = base.create_param("ATTR_ATTRNAME", ParamType::Octet)?;
        self.params.value = base.create_param("ATTR_VAL", ParamType::Float64)?;
        self.params.value_sum = base.create_param("ATTR_VAL_SUM", ParamType::Float64)?;
        self.params.reset = base.create_param("ATTR_RESET", ParamType::Int32)?;
        Ok(())
    }

    fn on_param_change(
        &mut self,
        reason: usize,
        params: &PluginParamSnapshot,
    ) -> ParamChangeResult {
        let addr = params.addr as usize;

        if reason == self.params.attr_name {
            if addr < MAX_ATTR_CHANNELS {
                if let ParamChangeValue::Octet(s) = &params.value {
                    self.channels[addr].name = s.clone();
                }
            }
        } else if reason == self.params.reset {
            if params.value.as_i32() != 0 {
                let mut updates = Vec::new();
                for (i, ch) in self.channels.iter_mut().enumerate() {
                    ch.value = 0.0;
                    ch.value_sum = 0.0;
                    let a = i as i32;
                    updates.push(ParamUpdate::float64_addr(self.params.value, a, 0.0));
                    updates.push(ParamUpdate::float64_addr(self.params.value_sum, a, 0.0));
                }
                return ParamChangeResult::updates(updates);
            }
        }

        ParamChangeResult::updates(vec![])
    }
}

/// Channel names for time series (one per attribute channel).
pub fn attr_ts_channel_names() -> Vec<&'static str> {
    vec![
        "TSArrayValue",
        "TSArrayValue1",
        "TSArrayValue2",
        "TSArrayValue3",
        "TSArrayValue4",
        "TSArrayValue5",
        "TSArrayValue6",
        "TSArrayValue7",
    ]
}

/// Create an Attribute plugin runtime. The TS receiver is stored in the registry
/// for later pickup by `NDTimeSeriesConfigure`.
pub fn create_attribute_runtime(
    port_name: &str,
    pool: std::sync::Arc<ad_core_rs::ndarray_pool::NDArrayPool>,
    queue_size: usize,
    ndarray_port: &str,
    wiring: std::sync::Arc<ad_core_rs::plugin::wiring::WiringRegistry>,
    ts_registry: &crate::time_series::TsReceiverRegistry,
) -> (
    ad_core_rs::plugin::runtime::PluginRuntimeHandle,
    std::thread::JoinHandle<()>,
) {
    let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);

    let mut processor = AttributeProcessor::new("");
    processor.set_ts_sender(ts_tx);

    let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime_multi_addr(
        port_name,
        processor,
        pool,
        queue_size,
        ndarray_port,
        wiring,
        MAX_ATTR_CHANNELS,
    );

    let channel_names: Vec<String> = attr_ts_channel_names()
        .iter()
        .map(|s| s.to_string())
        .collect();
    ts_registry.store(port_name, ts_rx, channel_names);

    (handle, data_jh)
}

#[cfg(test)]
mod tests {
    use super::*;
    use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
    use ad_core_rs::ndarray::{NDDataType, NDDimension};

    fn make_array_with_attr(name: &str, value: f64, uid: i32) -> NDArray {
        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
        arr.unique_id = uid;
        arr.attributes.add(NDAttribute {
            name: name.to_string(),
            description: String::new(),
            source: NDAttrSource::Driver,
            value: NDAttrValue::Float64(value),
        });
        arr
    }

    #[test]
    fn test_extract_named_attribute() {
        let mut proc = AttributeProcessor::new("Temperature");
        let pool = NDArrayPool::new(1_000_000);

        let arr = make_array_with_attr("Temperature", 25.5, 1);
        let result = proc.process_array(&arr, &pool);

        assert!(
            result.output_arrays.is_empty(),
            "attribute plugin is a sink"
        );
        assert!((proc.value() - 25.5).abs() < 1e-10);
        assert!((proc.value_sum() - 25.5).abs() < 1e-10);
    }

    #[test]
    fn test_sum_accumulation() {
        let mut proc = AttributeProcessor::new("Intensity");
        let pool = NDArrayPool::new(1_000_000);

        let arr1 = make_array_with_attr("Intensity", 10.0, 1);
        proc.process_array(&arr1, &pool);
        assert!((proc.value_sum() - 10.0).abs() < 1e-10);

        let arr2 = make_array_with_attr("Intensity", 20.0, 2);
        proc.process_array(&arr2, &pool);
        assert!((proc.value() - 20.0).abs() < 1e-10);
        assert!((proc.value_sum() - 30.0).abs() < 1e-10);
    }

    #[test]
    fn test_reset() {
        let mut proc = AttributeProcessor::new("Count");
        let pool = NDArrayPool::new(1_000_000);

        let arr1 = make_array_with_attr("Count", 100.0, 1);
        proc.process_array(&arr1, &pool);
        assert!((proc.value_sum() - 100.0).abs() < 1e-10);

        proc.reset();
        assert!((proc.value_sum() - 0.0).abs() < 1e-10);
        assert!((proc.value() - 0.0).abs() < 1e-10);
    }

    #[test]
    fn test_special_attr_unique_id() {
        let mut proc = AttributeProcessor::new("NDArrayUniqueId");
        let pool = NDArrayPool::new(1_000_000);

        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
        arr.unique_id = 42;

        proc.process_array(&arr, &pool);
        assert!((proc.value() - 42.0).abs() < 1e-10);
    }

    #[test]
    fn test_special_attr_timestamp() {
        let mut proc = AttributeProcessor::new("NDArrayTimeStamp");
        let pool = NDArrayPool::new(1_000_000);

        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
        arr.timestamp = ad_core_rs::timestamp::EpicsTimestamp {
            sec: 100,
            nsec: 500_000_000,
        };

        proc.process_array(&arr, &pool);
        assert!((proc.value() - 100.5).abs() < 1e-9);
    }

    #[test]
    fn test_missing_attribute() {
        let mut proc = AttributeProcessor::new("NonExistent");
        let pool = NDArrayPool::new(1_000_000);

        let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
        proc.process_array(&arr, &pool);

        assert!((proc.value() - 0.0).abs() < 1e-10);
        assert!((proc.value_sum() - 0.0).abs() < 1e-10);
    }

    #[test]
    fn test_string_attribute_ignored() {
        let mut proc = AttributeProcessor::new("Label");
        let pool = NDArrayPool::new(1_000_000);

        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
        arr.attributes.add(NDAttribute {
            name: "Label".to_string(),
            description: String::new(),
            source: NDAttrSource::Driver,
            value: NDAttrValue::String("hello".to_string()),
        });

        proc.process_array(&arr, &pool);
        assert!((proc.value() - 0.0).abs() < 1e-10);
    }

    #[test]
    fn test_int32_attribute() {
        let mut proc = AttributeProcessor::new("Counter");
        let pool = NDArrayPool::new(1_000_000);

        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
        arr.attributes.add(NDAttribute {
            name: "Counter".to_string(),
            description: String::new(),
            source: NDAttrSource::Driver,
            value: NDAttrValue::Int32(7),
        });

        proc.process_array(&arr, &pool);
        assert!((proc.value() - 7.0).abs() < 1e-10);
    }

    #[test]
    fn test_set_attr_name() {
        let mut proc = AttributeProcessor::new("A");
        assert_eq!(proc.attr_name(), "A");

        proc.set_attr_name("B");
        assert_eq!(proc.attr_name(), "B");

        let pool = NDArrayPool::new(1_000_000);
        let arr = make_array_with_attr("B", 99.0, 1);
        proc.process_array(&arr, &pool);
        assert!((proc.value() - 99.0).abs() < 1e-10);
    }
}