use std::collections::VecDeque;
use ad_core_rs::ndarray::NDArray;
use ad_core_rs::ndarray_pool::NDArrayPool;
use ad_core_rs::plugin::runtime::{
NDPluginProcess, ParamChangeResult, ParamUpdate, PluginParamSnapshot, ProcessResult,
};
pub const ATTRPLOT_UID_INDEX: i32 = -1;
pub const ATTRPLOT_NONE_INDEX: i32 = -2;
pub const ATTRPLOT_UID_LABEL: &str = "UID";
pub const ATTRPLOT_NONE_LABEL: &str = "None";
pub struct AttrPlotProcessor {
n_attributes: usize,
n_data_blocks: usize,
cache_size: usize,
attributes: Vec<String>,
buffers: Vec<VecDeque<f64>>,
uid_buffer: VecDeque<f64>,
data_selections: Vec<i32>,
initialized: bool,
last_uid: i32,
params: AttrPlotParams,
}
#[derive(Default)]
struct AttrPlotParams {
data: Option<usize>,
data_label: Option<usize>,
data_select: Option<usize>,
attribute: Option<usize>,
reset: Option<usize>,
npts: Option<usize>,
}
impl AttrPlotProcessor {
pub fn new(n_attributes: usize, cache_size: usize, n_data_blocks: usize) -> Self {
Self {
n_attributes,
n_data_blocks,
cache_size,
attributes: Vec::new(),
buffers: Vec::new(),
uid_buffer: VecDeque::new(),
data_selections: vec![ATTRPLOT_NONE_INDEX; n_data_blocks],
initialized: false,
last_uid: -1,
params: AttrPlotParams::default(),
}
}
pub fn attributes(&self) -> &[String] {
&self.attributes
}
pub fn buffer(&self, index: usize) -> Option<&VecDeque<f64>> {
self.buffers.get(index)
}
pub fn uid_buffer(&self) -> &VecDeque<f64> {
&self.uid_buffer
}
pub fn num_attributes(&self) -> usize {
self.attributes.len()
}
pub fn num_data_blocks(&self) -> usize {
self.n_data_blocks
}
pub fn find_attribute(&self, name: &str) -> Option<usize> {
self.attributes.iter().position(|n| n == name)
}
pub fn data_select(&self, block: usize) -> Option<i32> {
self.data_selections.get(block).copied()
}
pub fn set_data_select(&mut self, block: usize, value: i32) -> Result<(), &'static str> {
if block >= self.n_data_blocks {
return Err("data block index out of range");
}
if value >= 0 && (value as usize) >= self.attributes.len() {
return Err("attribute selection out of range");
}
self.data_selections[block] = value;
Ok(())
}
pub fn data_label(&self, block: usize) -> String {
match self.data_selections.get(block).copied() {
Some(ATTRPLOT_UID_INDEX) => ATTRPLOT_UID_LABEL.to_string(),
Some(sel) if sel >= 0 && (sel as usize) < self.attributes.len() => {
self.attributes[sel as usize].clone()
}
_ => ATTRPLOT_NONE_LABEL.to_string(),
}
}
pub fn reset(&mut self) {
self.initialized = false;
self.uid_buffer.clear();
for buf in &mut self.buffers {
buf.clear();
}
self.last_uid = -1;
}
fn push_capped(buf: &mut VecDeque<f64>, value: f64, cache_size: usize) {
if cache_size > 0 && buf.len() >= cache_size {
buf.pop_front();
}
buf.push_back(value);
}
fn rebuild_attributes(&mut self, array: &NDArray) {
let prior: Vec<Option<String>> = self
.data_selections
.iter()
.map(|&sel| match sel {
ATTRPLOT_UID_INDEX => Some(ATTRPLOT_UID_LABEL.to_string()),
s if s >= 0 && (s as usize) < self.attributes.len() => {
Some(self.attributes[s as usize].clone())
}
_ => None,
})
.collect();
let mut names: Vec<String> = Vec::new();
for attr in array.attributes.iter() {
if attr.value.as_f64().is_some() {
names.push(attr.name.clone());
}
}
names.sort();
names.truncate(self.n_attributes);
self.buffers = vec![VecDeque::new(); names.len()];
self.attributes = names;
for (i, want) in prior.into_iter().enumerate() {
self.data_selections[i] = match want {
Some(ref n) if n == ATTRPLOT_UID_LABEL => ATTRPLOT_UID_INDEX,
Some(n) => self
.attributes
.iter()
.position(|a| a == &n)
.map(|p| p as i32)
.unwrap_or(ATTRPLOT_NONE_INDEX),
None => ATTRPLOT_NONE_INDEX,
};
}
self.initialized = true;
}
fn push_data(&mut self, array: &NDArray) {
Self::push_capped(
&mut self.uid_buffer,
array.unique_id as f64,
self.cache_size,
);
for (i, name) in self.attributes.iter().enumerate() {
let value = array
.attributes
.get(name)
.and_then(|attr| attr.value.as_f64())
.unwrap_or(f64::NAN);
Self::push_capped(&mut self.buffers[i], value, self.cache_size);
}
}
fn block_waveform(&self, block: usize) -> Vec<f64> {
let selected = self
.data_selections
.get(block)
.copied()
.unwrap_or(ATTRPLOT_NONE_INDEX);
let src: Option<&VecDeque<f64>> = match selected {
ATTRPLOT_UID_INDEX => Some(&self.uid_buffer),
s if s >= 0 && (s as usize) < self.buffers.len() => Some(&self.buffers[s as usize]),
_ => None,
};
let size = self.uid_buffer.len();
let target = if self.cache_size > 0 {
self.cache_size
} else {
size
};
let mut out: Vec<f64> = match src {
Some(buf) => buf.iter().copied().collect(),
None => vec![f64::NAN; size],
};
let pad = out.last().copied().unwrap_or(f64::NAN);
if out.len() < target {
out.resize(target, pad);
} else {
out.truncate(target);
}
out
}
fn build_updates(&self) -> Vec<ParamUpdate> {
let mut updates = Vec::new();
if let Some(data) = self.params.data {
for block in 0..self.n_data_blocks {
updates.push(ParamUpdate::float64_array_addr(
data,
block as i32,
self.block_waveform(block),
));
}
}
if let Some(label) = self.params.data_label {
for block in 0..self.n_data_blocks {
updates.push(ParamUpdate::octet_addr(
label,
block as i32,
self.data_label(block),
));
}
}
if let Some(select) = self.params.data_select {
for block in 0..self.n_data_blocks {
updates.push(ParamUpdate::int32_addr(
select,
block as i32,
self.data_selections[block],
));
}
}
if let Some(attribute) = self.params.attribute {
for i in 0..self.n_attributes {
let name = self.attributes.get(i).cloned().unwrap_or_default();
updates.push(ParamUpdate::octet_addr(attribute, i as i32, name));
}
}
if let Some(npts) = self.params.npts {
updates.push(ParamUpdate::int32(npts, self.uid_buffer.len() as i32));
}
updates
}
}
impl NDPluginProcess for AttrPlotProcessor {
fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
if !self.uid_buffer.is_empty() && array.unique_id <= self.last_uid {
self.reset();
}
self.last_uid = array.unique_id;
if !self.initialized {
self.rebuild_attributes(array);
}
self.push_data(array);
ProcessResult::sink(self.build_updates())
}
fn plugin_type(&self) -> &str {
"NDPluginAttrPlot"
}
fn register_params(
&mut self,
base: &mut asyn_rs::port::PortDriverBase,
) -> asyn_rs::error::AsynResult<()> {
use asyn_rs::param::ParamType;
base.create_param("AP_Data", ParamType::Float64Array)?;
base.create_param("AP_DataLabel", ParamType::Octet)?;
base.create_param("AP_DataSelect", ParamType::Int32)?;
base.create_param("AP_Attribute", ParamType::Octet)?;
base.create_param("AP_Reset", ParamType::Int32)?;
base.create_param("AP_NPts", ParamType::Int32)?;
self.params.data = base.find_param("AP_Data");
self.params.data_label = base.find_param("AP_DataLabel");
self.params.data_select = base.find_param("AP_DataSelect");
self.params.attribute = base.find_param("AP_Attribute");
self.params.reset = base.find_param("AP_Reset");
self.params.npts = base.find_param("AP_NPts");
Ok(())
}
fn on_param_change(
&mut self,
reason: usize,
params: &PluginParamSnapshot,
) -> ParamChangeResult {
if Some(reason) == self.params.data_select {
let block = params.addr as usize;
let value = params.value.as_i32();
if self.set_data_select(block, value).is_ok() {
return ParamChangeResult::updates(self.build_updates());
}
} else if Some(reason) == self.params.reset {
if params.value.as_i32() != 0 {
self.reset();
return ParamChangeResult::updates(self.build_updates());
}
}
ParamChangeResult::updates(vec![])
}
}
#[cfg(test)]
mod tests {
use super::*;
use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
use ad_core_rs::ndarray::{NDDataType, NDDimension};
fn make_array_with_attrs(uid: i32, attrs: &[(&str, f64)]) -> NDArray {
let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr.unique_id = uid;
for (name, value) in attrs {
arr.attributes.add(NDAttribute::new_static(
*name,
String::new(),
NDAttrSource::Driver,
NDAttrValue::Float64(*value),
));
}
arr
}
#[test]
fn test_attribute_auto_detection() {
let mut proc = AttrPlotProcessor::new(8, 100, 4);
let pool = NDArrayPool::new(1_000_000);
let mut arr = make_array_with_attrs(1, &[("Temp", 25.0), ("Gain", 1.5)]);
arr.attributes.add(NDAttribute::new_static(
"Label",
String::new(),
NDAttrSource::Driver,
NDAttrValue::String("test".to_string()),
));
proc.process_array(&arr, &pool);
assert_eq!(proc.num_attributes(), 2);
assert_eq!(proc.attributes()[0], "Gain");
assert_eq!(proc.attributes()[1], "Temp");
}
#[test]
fn test_n_attributes_caps_tracked_count() {
let mut proc = AttrPlotProcessor::new(2, 100, 1);
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attrs(1, &[("D", 4.0), ("A", 1.0), ("C", 3.0), ("B", 2.0)]);
proc.process_array(&arr, &pool);
assert_eq!(proc.num_attributes(), 2);
assert_eq!(proc.attributes(), &["A", "B"]);
}
#[test]
fn test_data_select_maps_block_to_attribute() {
let mut proc = AttrPlotProcessor::new(8, 100, 2);
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attrs(1, &[("A", 10.0), ("B", 20.0), ("C", 30.0)]);
proc.process_array(&arr, &pool);
proc.set_data_select(0, 1).unwrap(); proc.set_data_select(1, ATTRPLOT_UID_INDEX).unwrap();
assert_eq!(proc.data_label(0), "B");
assert_eq!(proc.data_label(1), ATTRPLOT_UID_LABEL);
let wf0 = proc.block_waveform(0);
assert!((wf0[0] - 20.0).abs() < 1e-10, "block 0 plots attribute B");
let wf1 = proc.block_waveform(1);
assert!((wf1[0] - 1.0).abs() < 1e-10, "block 1 plots UID");
}
#[test]
fn test_data_select_rejects_out_of_range() {
let mut proc = AttrPlotProcessor::new(8, 100, 2);
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attrs(1, &[("A", 1.0)]);
proc.process_array(&arr, &pool);
assert!(proc.set_data_select(0, 1).is_err());
assert!(proc.set_data_select(5, 0).is_err());
assert!(proc.set_data_select(0, 0).is_ok());
assert!(proc.set_data_select(1, ATTRPLOT_UID_INDEX).is_ok());
}
#[test]
fn test_unbound_block_label_is_none() {
let mut proc = AttrPlotProcessor::new(8, 100, 3);
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attrs(1, &[("A", 1.0)]);
proc.process_array(&arr, &pool);
assert_eq!(proc.data_label(2), ATTRPLOT_NONE_LABEL);
assert_eq!(proc.data_select(2), Some(ATTRPLOT_NONE_INDEX));
}
#[test]
fn test_npts_tracks_point_count() {
let mut proc = AttrPlotProcessor::new(8, 100, 1);
let pool = NDArrayPool::new(1_000_000);
for i in 1..=4 {
let arr = make_array_with_attrs(i, &[("X", i as f64)]);
proc.process_array(&arr, &pool);
}
assert_eq!(proc.uid_buffer().len(), 4);
}
#[test]
fn test_waveform_padded_to_cache_size() {
let mut proc = AttrPlotProcessor::new(8, 6, 1);
let pool = NDArrayPool::new(1_000_000);
for i in 1..=3 {
let arr = make_array_with_attrs(i, &[("X", i as f64 * 10.0)]);
proc.process_array(&arr, &pool);
}
proc.set_data_select(0, 0).unwrap();
let wf = proc.block_waveform(0);
assert_eq!(wf.len(), 6);
assert!((wf[0] - 10.0).abs() < 1e-10);
assert!((wf[2] - 30.0).abs() < 1e-10);
assert!((wf[3] - 30.0).abs() < 1e-10);
assert!((wf[5] - 30.0).abs() < 1e-10);
}
#[test]
fn test_data_select_preserved_across_rebuild() {
let mut proc = AttrPlotProcessor::new(8, 100, 1);
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attrs(5, &[("Gain", 1.0), ("Temp", 25.0)]);
proc.process_array(&arr, &pool);
let temp_idx = proc.find_attribute("Temp").unwrap() as i32;
proc.set_data_select(0, temp_idx).unwrap();
let arr2 = make_array_with_attrs(1, &[("Gain", 2.0), ("Temp", 99.0)]);
proc.process_array(&arr2, &pool);
assert_eq!(proc.data_label(0), "Temp");
let wf = proc.block_waveform(0);
assert!((wf[0] - 99.0).abs() < 1e-10);
}
#[test]
fn test_value_tracking() {
let mut proc = AttrPlotProcessor::new(8, 100, 1);
let pool = NDArrayPool::new(1_000_000);
for i in 1..=5 {
let arr = make_array_with_attrs(i, &[("Value", i as f64 * 10.0)]);
proc.process_array(&arr, &pool);
}
let idx = proc.find_attribute("Value").unwrap();
let buf = proc.buffer(idx).unwrap();
assert_eq!(buf.len(), 5);
assert!((buf[0] - 10.0).abs() < 1e-10);
assert!((buf[4] - 50.0).abs() < 1e-10);
}
#[test]
fn test_circular_buffer_cache_size() {
let mut proc = AttrPlotProcessor::new(8, 3, 1);
let pool = NDArrayPool::new(1_000_000);
for i in 1..=5 {
let arr = make_array_with_attrs(i, &[("Val", i as f64)]);
proc.process_array(&arr, &pool);
}
let idx = proc.find_attribute("Val").unwrap();
let buf = proc.buffer(idx).unwrap();
assert_eq!(buf.len(), 3);
assert!((buf[0] - 3.0).abs() < 1e-10);
assert!((buf[2] - 5.0).abs() < 1e-10);
}
#[test]
fn test_uid_decrease_resets_buffers() {
let mut proc = AttrPlotProcessor::new(8, 100, 1);
let pool = NDArrayPool::new(1_000_000);
for i in 1..=5 {
let arr = make_array_with_attrs(i, &[("X", i as f64)]);
proc.process_array(&arr, &pool);
}
let idx = proc.find_attribute("X").unwrap();
assert_eq!(proc.buffer(idx).unwrap().len(), 5);
let arr = make_array_with_attrs(1, &[("X", 100.0)]);
proc.process_array(&arr, &pool);
let buf = proc.buffer(idx).unwrap();
assert_eq!(buf.len(), 1);
assert!((buf[0] - 100.0).abs() < 1e-10);
}
#[test]
fn test_missing_attribute_uses_nan() {
let mut proc = AttrPlotProcessor::new(8, 100, 1);
let pool = NDArrayPool::new(1_000_000);
let arr1 = make_array_with_attrs(1, &[("Temp", 25.0)]);
proc.process_array(&arr1, &pool);
let mut arr2 = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr2.unique_id = 2;
proc.process_array(&arr2, &pool);
let idx = proc.find_attribute("Temp").unwrap();
let buf = proc.buffer(idx).unwrap();
assert_eq!(buf.len(), 2);
assert!((buf[0] - 25.0).abs() < 1e-10);
assert!(buf[1].is_nan());
}
#[test]
fn test_manual_reset() {
let mut proc = AttrPlotProcessor::new(8, 100, 1);
let pool = NDArrayPool::new(1_000_000);
let arr = make_array_with_attrs(5, &[("A", 1.0), ("B", 2.0)]);
proc.process_array(&arr, &pool);
assert_eq!(proc.num_attributes(), 2);
proc.reset();
let arr2 = make_array_with_attrs(1, &[("C", 3.0)]);
proc.process_array(&arr2, &pool);
assert_eq!(proc.num_attributes(), 1);
assert_eq!(proc.attributes()[0], "C");
}
#[test]
fn test_unlimited_buffer() {
let mut proc = AttrPlotProcessor::new(8, 0, 1);
let pool = NDArrayPool::new(1_000_000);
for i in 1..=100 {
let arr = make_array_with_attrs(i, &[("X", i as f64)]);
proc.process_array(&arr, &pool);
}
let idx = proc.find_attribute("X").unwrap();
assert_eq!(proc.buffer(idx).unwrap().len(), 100);
}
#[test]
fn test_plugin_type() {
let proc = AttrPlotProcessor::new(8, 100, 1);
assert_eq!(proc.plugin_type(), "NDPluginAttrPlot");
}
}