use std::sync::Arc;
use ad_core_rs::ndarray::{NDArray, NDDataBuffer, NDDataType};
use ad_core_rs::ndarray_pool::NDArrayPool;
use ad_core_rs::plugin::runtime::{NDPluginProcess, ProcessResult};
use parking_lot::Mutex;
use epics_bridge_rs::qsrv::PvaPvHandle;
use epics_pva_rs::nt::nd_array::{
NdAlarm, NdArrayBuffer, NdAttribute, NdCodec, NdDimension, NdTimeStamp, NtNdArray,
nt_nd_array_desc, nt_nd_array_value,
};
use epics_pva_rs::pvdata::{PvField, ScalarValue, VariantValue};
pub struct PvaProcessor {
pv_name: String,
handle: PvaPvHandle,
}
impl PvaProcessor {
pub fn new(pv_name: String) -> Self {
Self {
pv_name,
handle: PvaPvHandle::new(Some(nt_nd_array_desc())),
}
}
pub fn handle(&self) -> PvaPvHandle {
self.handle.clone()
}
}
impl Default for PvaProcessor {
fn default() -> Self {
Self::new(String::new())
}
}
impl NDPluginProcess for PvaProcessor {
fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
let payload = ndarray_to_pv_field(array);
if let Err(err) = self.handle.post(payload) {
tracing::warn!(
pv = %self.pv_name,
error = %err,
"dropping NDArray frame that does not match the NTNDArray descriptor"
);
}
ProcessResult::arrays(vec![Arc::new(array.clone())])
}
fn plugin_type(&self) -> &str {
"NDPluginPva"
}
fn register_params(
&mut self,
base: &mut asyn_rs::port::PortDriverBase,
) -> asyn_rs::error::AsynResult<()> {
let idx = base.create_param("PV_NAME", asyn_rs::param::ParamType::Octet)?;
base.set_string_param(idx, 0, self.pv_name.clone())?;
Ok(())
}
fn array_data_handle(&self) -> Option<Arc<Mutex<Option<Arc<NDArray>>>>> {
None
}
}
fn ndarray_to_pv_field(array: &NDArray) -> PvField {
let original_type = crate::codec::original_data_type(array);
let num_elements: i64 = if array.dims.is_empty() {
0
} else {
array.dims.iter().map(|d| d.size as i64).product()
};
let uncompressed_size = num_elements * original_type.element_size() as i64;
let codec_parameters = Some(VariantValue::scalar(ScalarValue::Int(
nd_data_type_to_scalar(original_type),
)));
let (value, compressed_size, codec_name) = match &array.codec {
Some(c) => (
NdArrayBuffer::UByte(array.data.as_u8_slice().to_vec()),
c.compressed_size as i64,
codec_name_to_string(c.name),
),
None => (
ndbuffer_to_buffer(&array.data),
uncompressed_size,
String::new(),
),
};
let codec = NdCodec {
name: codec_name,
parameters: codec_parameters,
};
let dimension: Vec<NdDimension> = array
.dims
.iter()
.map(|d| NdDimension {
size: d.size as i32,
offset: d.offset as i32,
full_size: d.size as i32,
binning: d.binning.max(1) as i32,
reverse: d.reverse,
})
.collect();
let data_time_stamp = double_ts_to_nt(array.time_stamp);
let time_stamp = epics_ts_to_nt(&array.timestamp);
let attribute: Vec<NdAttribute> = array
.attributes
.iter()
.map(|a| NdAttribute {
name: a.name.clone(),
value: attribute_value_to_variant(&a.value),
tags: Vec::new(),
descriptor: a.description.clone(),
alarm: NdAlarm::default(),
time_stamp: NdTimeStamp::default(),
source_type: ndattr_source_type(&a.source),
source: a.source.source_string().to_string(),
})
.collect();
let nt = NtNdArray {
value,
codec,
compressed_size,
uncompressed_size,
dimension,
unique_id: array.unique_id,
data_time_stamp,
attribute,
alarm: NdAlarm {
severity: 0,
status: 0,
message: "NO_ALARM".into(),
},
time_stamp,
};
nt_nd_array_value(&nt)
}
fn ndbuffer_to_buffer(buf: &NDDataBuffer) -> NdArrayBuffer {
match buf {
NDDataBuffer::I8(v) => NdArrayBuffer::Byte(v.clone()),
NDDataBuffer::U8(v) => NdArrayBuffer::UByte(v.clone()),
NDDataBuffer::I16(v) => NdArrayBuffer::Short(v.clone()),
NDDataBuffer::U16(v) => NdArrayBuffer::UShort(v.clone()),
NDDataBuffer::I32(v) => NdArrayBuffer::Int(v.clone()),
NDDataBuffer::U32(v) => NdArrayBuffer::UInt(v.clone()),
NDDataBuffer::I64(v) => NdArrayBuffer::Long(v.clone()),
NDDataBuffer::U64(v) => NdArrayBuffer::ULong(v.clone()),
NDDataBuffer::F32(v) => NdArrayBuffer::Float(v.clone()),
NDDataBuffer::F64(v) => NdArrayBuffer::Double(v.clone()),
}
}
fn epics_ts_to_nt(ts: &ad_core_rs::timestamp::EpicsTimestamp) -> NdTimeStamp {
NdTimeStamp {
seconds_past_epoch: ts.sec as i64 + ad_core_rs::timestamp::EPICS_EPOCH_OFFSET as i64,
nanoseconds: ts.nsec as i32,
user_tag: 0,
}
}
fn double_ts_to_nt(t: f64) -> NdTimeStamp {
let seconds = t.floor();
let nanoseconds = ((t - seconds) * 1e9) as i32;
NdTimeStamp {
seconds_past_epoch: seconds as i64 + ad_core_rs::timestamp::EPICS_EPOCH_OFFSET as i64,
nanoseconds,
user_tag: 0,
}
}
fn codec_name_to_string(name: ad_core_rs::codec::CodecName) -> String {
name.as_str().to_string()
}
fn nd_data_type_to_scalar(dt: NDDataType) -> i32 {
match dt {
NDDataType::Int8 => 1, NDDataType::UInt8 => 5, NDDataType::Int16 => 2, NDDataType::UInt16 => 6, NDDataType::Int32 => 3, NDDataType::UInt32 => 7, NDDataType::Int64 => 4, NDDataType::UInt64 => 8, NDDataType::Float32 => 9, NDDataType::Float64 => 10, }
}
fn ndattr_source_type(src: &ad_core_rs::attributes::NDAttrSource) -> i32 {
use ad_core_rs::attributes::NDAttrSource;
match src {
NDAttrSource::Driver => 0,
NDAttrSource::Param { .. } => 1,
NDAttrSource::EpicsPV(_) => 2,
NDAttrSource::Function(_) => 3,
NDAttrSource::Constant(_) => 4,
NDAttrSource::Undefined => 5,
}
}
fn attribute_value_to_variant(val: &ad_core_rs::attributes::NDAttrValue) -> VariantValue {
use ad_core_rs::attributes::NDAttrValue;
let scalar = match val {
NDAttrValue::Int8(v) => ScalarValue::Byte(*v),
NDAttrValue::UInt8(v) => ScalarValue::UByte(*v),
NDAttrValue::Int16(v) => ScalarValue::Short(*v),
NDAttrValue::UInt16(v) => ScalarValue::UShort(*v),
NDAttrValue::Int32(v) => ScalarValue::Int(*v),
NDAttrValue::UInt32(v) => ScalarValue::UInt(*v),
NDAttrValue::Int64(v) => ScalarValue::Long(*v),
NDAttrValue::UInt64(v) => ScalarValue::ULong(*v),
NDAttrValue::Float32(v) => ScalarValue::Float(*v),
NDAttrValue::Float64(v) => ScalarValue::Double(*v),
NDAttrValue::String(v) => ScalarValue::String(v.clone().into()),
NDAttrValue::Undefined => return VariantValue::null(),
};
VariantValue::scalar(scalar)
}
#[cfg(test)]
mod tests {
use super::*;
use ad_core_rs::ndarray::{NDDataType, NDDimension};
#[test]
fn convert_simple_array() {
let mut arr = NDArray::new(
vec![NDDimension::new(4), NDDimension::new(4)],
NDDataType::UInt8,
);
arr.unique_id = 42;
if let NDDataBuffer::U8(ref mut buf) = arr.data {
for (i, v) in buf.iter_mut().enumerate() {
*v = i as u8;
}
}
let payload = ndarray_to_pv_field(&arr);
match &payload {
PvField::Structure(s) => {
assert_eq!(s.struct_id, "epics:nt/NTNDArray:1.0");
assert!(s.get_field("value").is_some());
assert!(s.get_field("dimension").is_some());
}
_ => panic!("expected structure"),
}
}
#[test]
fn compressed_array_emits_ubyte_value_and_codec_parameters() {
let mut arr = NDArray::new(vec![NDDimension::new(8)], NDDataType::UInt16);
if let NDDataBuffer::U16(ref mut buf) = arr.data {
for (i, v) in buf.iter_mut().enumerate() {
*v = (i * 1000) as u16;
}
}
let uncompressed_bytes = (arr.data.len() * 2) as i64;
let compressed = crate::codec::compress_lz4(&arr);
let comp_size = compressed.codec.as_ref().unwrap().compressed_size as i64;
assert!(
matches!(compressed.data, NDDataBuffer::U8(_)),
"compressed buffer must hold raw bytes"
);
let payload = ndarray_to_pv_field(&compressed);
let PvField::Structure(s) = &payload else {
panic!("expected structure, got {payload:?}");
};
let Some(PvField::Union {
variant_name,
value,
..
}) = s.get_field("value")
else {
panic!("expected value union");
};
assert_eq!(variant_name, "ubyteValue");
match value.as_ref() {
PvField::ScalarArray(items) => {
assert_eq!(items.len() as i64, comp_size);
assert!(matches!(items.first(), Some(ScalarValue::UByte(_))));
}
other => panic!("expected ubyte scalar array, got {other:?}"),
}
assert_eq!(
s.get_field("compressedSize"),
Some(&PvField::Scalar(ScalarValue::Long(comp_size)))
);
assert_eq!(
s.get_field("uncompressedSize"),
Some(&PvField::Scalar(ScalarValue::Long(uncompressed_bytes)))
);
let Some(PvField::Structure(codec)) = s.get_field("codec") else {
panic!("expected codec structure");
};
assert_eq!(
codec.get_field("name"),
Some(&PvField::Scalar(ScalarValue::String("lz4".into())))
);
let Some(PvField::Variant(params)) = codec.get_field("parameters") else {
panic!("expected codec.parameters variant");
};
assert_eq!(params.value, PvField::Scalar(ScalarValue::Int(6)));
assert!(
params.desc.is_some(),
"codec.parameters must carry a descriptor (non-null variant)"
);
}
#[test]
fn compressed_frame_attribute_list_omits_codec_carrier() {
use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
let mut arr = NDArray::new(vec![NDDimension::new(8)], NDDataType::UInt16);
if let NDDataBuffer::U16(ref mut buf) = arr.data {
for (i, v) in buf.iter_mut().enumerate() {
*v = (i * 1000) as u16;
}
}
arr.attributes.add(NDAttribute::new_static(
"ColorMode",
"Color Mode",
NDAttrSource::Driver,
NDAttrValue::Int32(0),
));
let compressed = crate::codec::compress_lz4(&arr);
assert_eq!(
compressed.codec.as_ref().unwrap().original_data_type,
NDDataType::UInt16,
"precondition: the original element type is recorded in the codec"
);
assert!(
compressed
.attributes
.get("CODEC_ORIGINAL_DATA_TYPE")
.is_none(),
"precondition: no carrier attribute is attached to the array"
);
let payload = ndarray_to_pv_field(&compressed);
let PvField::Structure(s) = &payload else {
panic!("expected structure, got {payload:?}");
};
let Some(PvField::StructureArray(attrs)) = s.get_field("attribute") else {
panic!("expected attribute structure-array");
};
let names: Vec<String> = attrs
.iter()
.filter_map(|a| a.as_ref())
.filter_map(|a| match a.get_field("name") {
Some(PvField::Scalar(ScalarValue::String(v))) => {
Some(v.as_str_lossy().into_owned())
}
_ => None,
})
.collect();
assert!(
!names.iter().any(|n| n == "CODEC_ORIGINAL_DATA_TYPE"),
"internal codec carrier must not appear in NTNDArray attribute[]; got {names:?}"
);
assert!(
names.iter().any(|n| n == "ColorMode"),
"genuine driver attributes must still be published; got {names:?}"
);
}
#[test]
fn processor_stores_latest() {
let mut proc = PvaProcessor::new("TEST:Pva1:Image".into());
let pool = NDArrayPool::new(1_000_000);
let arr = NDArray::new(vec![NDDimension::new(8)], NDDataType::Float64);
proc.process_array(&arr, &pool);
assert!(proc.handle().current_value().is_some());
}
#[test]
fn attribute_values_fill_any_slot() {
use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
let mut arr = NDArray::new(vec![NDDimension::new(2)], NDDataType::UInt8);
arr.attributes.add(NDAttribute::new_static(
"gain",
"detector gain",
NDAttrSource::Constant(String::new()),
NDAttrValue::Int32(7),
));
arr.attributes.add(NDAttribute::new_static(
"missing",
"undefined attr",
NDAttrSource::Undefined,
NDAttrValue::Undefined,
));
let payload = ndarray_to_pv_field(&arr);
let PvField::Structure(s) = &payload else {
panic!("expected structure, got {payload:?}");
};
let Some(PvField::StructureArray(attrs)) = s.get_field("attribute") else {
panic!("expected attribute structure-array");
};
assert_eq!(attrs.len(), 2);
let gain = attrs[0].as_ref().expect("gain attribute present");
match gain.get_field("value") {
Some(PvField::Variant(vv)) => {
assert_eq!(vv.value, PvField::Scalar(ScalarValue::Int(7)));
assert!(vv.desc.is_some(), "defined value must carry a descriptor");
}
other => panic!("expected variant gain value, got {other:?}"),
}
let missing = attrs[1].as_ref().expect("undefined attribute present");
match missing.get_field("value") {
Some(PvField::Variant(vv)) => {
assert_eq!(vv.value, PvField::Null);
assert!(
vv.desc.is_none(),
"undefined value must be the null variant"
);
}
other => panic!("expected null variant value, got {other:?}"),
}
}
#[test]
fn attribute_timestamp_stays_default_not_image_ts() {
use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
use epics_pva_rs::pvdata::PvStructure;
let mut arr = NDArray::new(vec![NDDimension::new(2)], NDDataType::UInt8);
arr.time_stamp = 1234.0;
arr.timestamp.sec = 1234;
arr.timestamp.nsec = 567;
arr.attributes.add(NDAttribute::new_static(
"gain",
"detector gain",
NDAttrSource::Constant(String::new()),
NDAttrValue::Int32(7),
));
let payload = ndarray_to_pv_field(&arr);
let PvField::Structure(s) = &payload else {
panic!("expected structure, got {payload:?}");
};
let read_ts = |st: &PvStructure, field: &str| -> (i64, i32) {
let Some(PvField::Structure(ts)) = st.get_field(field) else {
panic!("expected {field} time_t structure");
};
let sec = match ts.get_field("secondsPastEpoch") {
Some(PvField::Scalar(ScalarValue::Long(v))) => *v,
other => panic!("expected secondsPastEpoch Long, got {other:?}"),
};
let nsec = match ts.get_field("nanoseconds") {
Some(PvField::Scalar(ScalarValue::Int(v))) => *v,
other => panic!("expected nanoseconds Int, got {other:?}"),
};
(sec, nsec)
};
let offset = ad_core_rs::timestamp::EPICS_EPOCH_OFFSET as i64;
assert_eq!(read_ts(s, "dataTimeStamp"), (1234 + offset, 0));
assert_eq!(read_ts(s, "timeStamp"), (1234 + offset, 567));
let Some(PvField::StructureArray(attrs)) = s.get_field("attribute") else {
panic!("expected attribute structure-array");
};
let attr0 = attrs[0].as_ref().expect("attribute present");
assert_eq!(
read_ts(attr0, "timeStamp"),
(0, 0),
"per-attribute timeStamp must stay default, not the image timestamp"
);
}
#[test]
fn attribute_source_string_and_type_match_c() {
use ad_core_rs::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
let mut arr = NDArray::new(vec![NDDimension::new(2)], NDDataType::UInt8);
arr.attributes.add(NDAttribute::new_static(
"FromDriver",
"driver attr",
NDAttrSource::Driver,
NDAttrValue::Int32(0),
));
arr.attributes.add(NDAttribute::new_static(
"Counter",
"param attr",
NDAttrSource::Param {
port_name: "SIM1".into(),
param_name: "ARRAY_COUNTER".into(),
},
NDAttrValue::Int32(0),
));
arr.attributes.add(NDAttribute::new_static(
"Temp",
"epics pv attr",
NDAttrSource::EpicsPV("13SIM1:cam1:Temperature".into()),
NDAttrValue::Float64(0.0),
));
arr.attributes.add(NDAttribute::new_static(
"Computed",
"function attr",
NDAttrSource::Function("my_func".into()),
NDAttrValue::Int32(0),
));
arr.attributes.add(NDAttribute::new_static(
"Label",
"const attr",
NDAttrSource::Constant("a constant".into()),
NDAttrValue::String("a constant".into()),
));
let payload = ndarray_to_pv_field(&arr);
let PvField::Structure(s) = &payload else {
panic!("expected structure, got {payload:?}");
};
let Some(PvField::StructureArray(attrs)) = s.get_field("attribute") else {
panic!("expected attribute structure-array");
};
assert_eq!(attrs.len(), 5);
let src_str = |i: usize| -> String {
let a = attrs[i].as_ref().expect("attribute present");
match a.get_field("source") {
Some(PvField::Scalar(ScalarValue::String(v))) => v.as_str_lossy().into_owned(),
other => panic!("expected source string, got {other:?}"),
}
};
let src_type = |i: usize| -> i32 {
let a = attrs[i].as_ref().expect("attribute present");
match a.get_field("sourceType") {
Some(PvField::Scalar(ScalarValue::Int(v))) => *v,
other => panic!("expected sourceType int, got {other:?}"),
}
};
assert_eq!(src_str(0), "Driver");
assert_eq!(src_type(0), 0);
assert_eq!(src_str(1), "ARRAY_COUNTER");
assert_eq!(src_type(1), 1);
assert_eq!(src_str(2), "13SIM1:cam1:Temperature");
assert_eq!(src_type(2), 2);
assert_eq!(src_str(3), "my_func");
assert_eq!(src_type(3), 3);
assert_eq!(src_str(4), "a constant");
assert_eq!(src_type(4), 4);
}
#[test]
fn nt_timestamps_use_distinct_sources_and_posix_offset() {
use epics_pva_rs::pvdata::PvStructure;
let mut arr = NDArray::new(vec![NDDimension::new(2)], NDDataType::UInt8);
arr.time_stamp = 10.25;
arr.timestamp.sec = 1000;
arr.timestamp.nsec = 42;
let payload = ndarray_to_pv_field(&arr);
let PvField::Structure(s) = &payload else {
panic!("expected structure, got {payload:?}");
};
let read_ts = |st: &PvStructure, field: &str| -> (i64, i32) {
let Some(PvField::Structure(ts)) = st.get_field(field) else {
panic!("expected {field} time_t structure");
};
let sec = match ts.get_field("secondsPastEpoch") {
Some(PvField::Scalar(ScalarValue::Long(v))) => *v,
other => panic!("expected secondsPastEpoch Long, got {other:?}"),
};
let nsec = match ts.get_field("nanoseconds") {
Some(PvField::Scalar(ScalarValue::Int(v))) => *v,
other => panic!("expected nanoseconds Int, got {other:?}"),
};
(sec, nsec)
};
let offset = ad_core_rs::timestamp::EPICS_EPOCH_OFFSET as i64;
assert_eq!(read_ts(s, "dataTimeStamp"), (10 + offset, 250_000_000));
assert_eq!(read_ts(s, "timeStamp"), (1000 + offset, 42));
}
}