use std::path::Path;
use std::sync::Arc;
use asyn_rs::error::AsynResult;
use asyn_rs::port::{PortDriverBase, PortFlags};
use crate::attributes::{
EpicsPvAttributeSource, FunctionAttributeSource, NDAttrSource, NDAttrValue, NDAttribute,
NDAttributeFunctionRegistry, ParamAttributeSource,
};
use crate::ndarray::NDArray;
use crate::ndarray_pool::NDArrayPool;
use crate::params::ndarray_driver::NDArrayDriverParams;
use crate::plugin::channel::{NDArrayOutput, NDArraySender, QueuedArrayCounter};
pub const ATTR_STATUS_OK: i32 = 0;
pub const ATTR_STATUS_FILE_NOT_FOUND: i32 = 1;
pub const ATTR_STATUS_XML_SYNTAX_ERROR: i32 = 2;
pub const ATTR_STATUS_MACRO_ERROR: i32 = 3;
fn xml_attr<'a>(tag: &'a str, key: &str) -> Option<&'a str> {
let pat = format!("{key}=\"");
let start = tag.find(&pat)? + pat.len();
let end = tag[start..].find('"')? + start;
Some(&tag[start..end])
}
fn parse_attributes_xml(
xml: &str,
registry: &std::sync::Arc<NDAttributeFunctionRegistry>,
) -> Result<Vec<NDAttribute>, String> {
if !xml.contains("<Attributes>") {
return Err("missing <Attributes> root element".into());
}
let mut out = Vec::new();
let mut rest = xml;
while let Some(open) = rest.find("<Attribute ") {
let after = &rest[open + 1..];
let close = after
.find("/>")
.or_else(|| after.find('>'))
.ok_or_else(|| "unterminated <Attribute> tag".to_string())?;
let tag = &after[..close];
let name = xml_attr(tag, "name")
.ok_or_else(|| "Attribute missing name".to_string())?
.to_string();
let description = xml_attr(tag, "description").unwrap_or("").to_string();
let source_str =
xml_attr(tag, "source").ok_or_else(|| format!("Attribute {name} missing source"))?;
let attr_type = xml_attr(tag, "type").unwrap_or("EPICS_PV");
let attr = match attr_type.to_ascii_uppercase().as_str() {
"EPICS_PV" => {
let src = EpicsPvAttributeSource::new(source_str);
NDAttribute::new_with_source(name, description, NDAttrSource::EpicsPV, src)
}
"PARAM" => {
let addr = xml_attr(tag, "addr")
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(0);
let src = ParamAttributeSource::new(source_str, addr);
NDAttribute::new_with_source(
name,
description,
NDAttrSource::Param {
port_name: String::new(),
param_name: source_str.to_string(),
},
src,
)
}
"FUNCTION" => {
let func_param = xml_attr(tag, "param").unwrap_or("");
let src = FunctionAttributeSource::new(registry.clone(), source_str, func_param);
NDAttribute::new_with_source(name, description, NDAttrSource::Function, src)
}
"CONST" => NDAttribute::new_static(
name,
description,
NDAttrSource::Constant,
NDAttrValue::String(source_str.to_string()),
),
other => {
return Err(format!(
"unknown attribute type '{other}' for attribute {name}"
));
}
};
out.push(attr);
rest = &after[close..];
}
Ok(out)
}
fn sprintf_template(template: &str, path: &str, name: &str, number: i32) -> String {
let mut result = String::with_capacity(template.len() + path.len() + name.len() + 16);
let mut chars = template.chars().peekable();
let mut string_arg_idx = 0;
while let Some(ch) = chars.next() {
if ch == '%' {
let mut spec = String::new();
while let Some(&c) = chars.peek() {
if c == 's' || c == 'd' || c == 'i' || c == 'o' || c == 'x' || c == 'X' {
break;
}
if c == '%' {
break;
}
spec.push(c);
chars.next();
}
match chars.next() {
Some('s') => {
let s = if string_arg_idx == 0 { path } else { name };
string_arg_idx += 1;
result.push_str(s);
}
Some('d') | Some('i') => {
let formatted = format_int_spec(&spec, number);
result.push_str(&formatted);
}
Some('%') => {
result.push('%');
}
Some(c) => {
result.push('%');
result.push_str(&spec);
result.push(c);
}
None => {
result.push('%');
result.push_str(&spec);
}
}
} else {
result.push(ch);
}
}
result
}
fn format_int_spec(spec: &str, value: i32) -> String {
if spec.is_empty() {
return value.to_string();
}
let zero_flag = spec.starts_with('0');
let spec_clean = if zero_flag { &spec[1..] } else { spec };
let (width_str, prec_str) = if let Some(dot_pos) = spec_clean.find('.') {
(&spec_clean[..dot_pos], Some(&spec_clean[dot_pos + 1..]))
} else {
(spec_clean, None)
};
let width: usize = width_str.parse().unwrap_or(0);
let has_precision = prec_str.is_some();
let precision: usize = prec_str.and_then(|s| s.parse().ok()).unwrap_or(0);
let negative = value < 0;
let digits = value.unsigned_abs().to_string();
let digits = if digits.len() < precision {
format!("{}{}", "0".repeat(precision - digits.len()), digits)
} else {
digits
};
let body = if negative {
format!("-{digits}")
} else {
digits
};
if body.len() >= width {
body
} else if zero_flag && !has_precision {
let pad = width - body.len();
if negative {
format!("-{}{}", "0".repeat(pad), &body[1..])
} else {
format!("{}{}", "0".repeat(pad), body)
}
} else {
format!("{}{}", " ".repeat(width - body.len()), body)
}
}
pub(crate) fn write_array_params(
port_base: &mut PortDriverBase,
params: &NDArrayDriverParams,
array: &NDArray,
) -> AsynResult<()> {
let info = array.info();
port_base.set_int32_param(params.array_size_x, 0, info.x_size as i32)?;
port_base.set_int32_param(params.array_size_y, 0, info.y_size as i32)?;
port_base.set_int32_param(params.array_size_z, 0, info.color_size as i32)?;
port_base.set_int32_param(params.array_size, 0, info.total_bytes as i32)?;
port_base.set_int32_param(params.unique_id, 0, array.unique_id)?;
port_base.set_int32_param(params.n_dimensions, 0, array.dims.len() as i32)?;
let dim_sizes: Vec<i32> = array.dims.iter().map(|d| d.size as i32).collect();
port_base
.params
.set_int32_array(params.array_dimensions, 0, dim_sizes)?;
port_base.set_int32_param(params.data_type, 0, array.data.data_type() as i32)?;
port_base.set_int32_param(params.color_mode, 0, info.color_mode as i32)?;
if let Some(bp) = array
.attributes
.get("bayerPattern")
.and_then(|a| a.value.as_i64())
{
let pattern = crate::color::NDBayerPattern::from_i32(bp as i32);
port_base.set_int32_param(params.bayer_pattern, 0, pattern.as_i32())?;
}
port_base.set_float64_param(params.timestamp_rbv, 0, array.time_stamp)?;
port_base.set_int32_param(params.epics_ts_sec, 0, array.timestamp.sec as i32)?;
port_base.set_int32_param(params.epics_ts_nsec, 0, array.timestamp.nsec as i32)?;
match &array.codec {
Some(codec) => {
port_base.set_string_param(params.codec, 0, codec.name.as_str().into())?;
port_base.set_int32_param(params.compressed_size, 0, codec.compressed_size as i32)?;
}
None => {
port_base.set_string_param(params.codec, 0, String::new())?;
port_base.set_int32_param(params.compressed_size, 0, info.total_bytes as i32)?;
}
}
Ok(())
}
pub(crate) fn refresh_pool_stats(
port_base: &mut PortDriverBase,
params: &NDArrayDriverParams,
pool: &NDArrayPool,
) -> AsynResult<()> {
const MEGABYTE: f64 = 1_048_576.0;
port_base.set_float64_param(
params.pool_max_memory,
0,
pool.max_memory() as f64 / MEGABYTE,
)?;
port_base.set_float64_param(
params.pool_used_memory,
0,
pool.allocated_bytes() as f64 / MEGABYTE,
)?;
port_base.set_int32_param(
params.pool_alloc_buffers,
0,
pool.num_alloc_buffers() as i32,
)?;
port_base.set_int32_param(params.pool_free_buffers, 0, pool.num_free_buffers() as i32)?;
Ok(())
}
pub(crate) fn handle_pool_write_int32(
port_base: &mut PortDriverBase,
params: &NDArrayDriverParams,
pool: &NDArrayPool,
param_index: usize,
template_array: Option<&NDArray>,
) -> AsynResult<bool> {
if param_index == params.pool_empty_free_list {
pool.empty_free_list();
refresh_pool_stats(port_base, params, pool)?;
Ok(true)
} else if param_index == params.pool_poll_stats {
refresh_pool_stats(port_base, params, pool)?;
Ok(true)
} else if param_index == params.pool_pre_alloc {
if let Some(template) = template_array {
let count = port_base
.get_int32_param(params.pool_num_pre_alloc_buffers, 0)
.unwrap_or(0)
.max(0) as usize;
pool.pre_allocate_buffers(template, count).map_err(|e| {
asyn_rs::error::AsynError::Status {
status: asyn_rs::error::AsynStatus::Error,
message: e.to_string(),
}
})?;
refresh_pool_stats(port_base, params, pool)?;
}
port_base.set_int32_param(params.pool_pre_alloc, 0, 0)?;
Ok(true)
} else {
Ok(false)
}
}
pub struct NDArrayDriverBase {
pub port_base: PortDriverBase,
pub params: NDArrayDriverParams,
pub pool: Arc<NDArrayPool>,
pub array_output: NDArrayOutput,
pub queued_counter: Arc<QueuedArrayCounter>,
pub last_array: Option<Arc<NDArray>>,
pub attributes: crate::attributes::NDAttributeList,
pub attr_functions: std::sync::Arc<NDAttributeFunctionRegistry>,
}
impl NDArrayDriverBase {
pub fn new(port_name: &str, max_memory: usize) -> AsynResult<Self> {
let mut port_base = PortDriverBase::new(
port_name,
1,
PortFlags {
can_block: true,
..Default::default()
},
);
let params = NDArrayDriverParams::create(&mut port_base)?;
port_base.set_int32_param(params.array_callbacks, 0, 1)?;
port_base.set_float64_param(params.pool_max_memory, 0, max_memory as f64 / 1_048_576.0)?;
let pool = Arc::new(NDArrayPool::new(max_memory));
Ok(Self {
port_base,
params,
pool,
array_output: NDArrayOutput::new(),
queued_counter: Arc::new(QueuedArrayCounter::new()),
last_array: None,
attributes: crate::attributes::NDAttributeList::new(),
attr_functions: NDAttributeFunctionRegistry::new(),
})
}
pub fn connect_downstream(&mut self, mut sender: NDArraySender) {
sender.set_queued_counter(self.queued_counter.clone());
self.array_output.add(sender);
}
pub fn write_int32_pool(&mut self, param_index: usize, _value: i32) -> AsynResult<bool> {
let template = self.last_array.clone();
handle_pool_write_int32(
&mut self.port_base,
&self.params,
&self.pool,
param_index,
template.as_deref(),
)
}
pub fn num_plugins(&self) -> usize {
self.array_output.num_senders()
}
pub fn prepare_array(&mut self, mut array: Arc<NDArray>) -> AsynResult<Option<Arc<NDArray>>> {
let counter = self
.port_base
.get_int32_param(self.params.array_counter, 0)?
+ 1;
self.port_base
.set_int32_param(self.params.array_counter, 0, counter)?;
if !self.attributes.is_empty() {
let fresh = self.update_attributes();
Arc::make_mut(&mut array).attributes.copy_from(&fresh);
}
write_array_params(&mut self.port_base, &self.params, &array)?;
self.last_array = Some(array.clone());
self.port_base.set_float64_param(
self.params.pool_used_memory,
0,
self.pool.allocated_bytes() as f64 / 1_048_576.0,
)?;
self.port_base.set_int32_param(
self.params.pool_free_buffers,
0,
self.pool.num_free_buffers() as i32,
)?;
self.port_base.set_int32_param(
self.params.pool_alloc_buffers,
0,
self.pool.num_alloc_buffers() as i32,
)?;
let callbacks_enabled = self
.port_base
.get_int32_param(self.params.array_callbacks, 0)?
!= 0;
let to_publish = if callbacks_enabled {
self.port_base.set_generic_pointer_param(
self.params.ndarray_data,
0,
array.clone() as Arc<dyn std::any::Any + Send + Sync>,
)?;
Some(array)
} else {
None
};
self.port_base.call_param_callbacks(0)?;
Ok(to_publish)
}
pub fn create_file_name(&mut self) -> AsynResult<String> {
let path = self.port_base.get_string_param(self.params.file_path, 0)?;
let name = self.port_base.get_string_param(self.params.file_name, 0)?;
let number = self.port_base.get_int32_param(self.params.file_number, 0)?;
let template = self
.port_base
.get_string_param(self.params.file_template, 0)?;
let auto_increment = self
.port_base
.get_int32_param(self.params.auto_increment, 0)
.unwrap_or(0);
let full = sprintf_template(template, path, name, number);
self.port_base
.set_string_param(self.params.full_file_name, 0, full.clone())?;
if auto_increment != 0 {
self.port_base
.set_int32_param(self.params.file_number, 0, number + 1)?;
}
Ok(full)
}
pub fn check_path(&mut self) -> AsynResult<bool> {
let path_ref = self.port_base.get_string_param(self.params.file_path, 0)?;
let mut path = path_ref.to_string();
if !path.is_empty() && !path.ends_with('/') && !path.ends_with(std::path::MAIN_SEPARATOR) {
path.push('/');
self.port_base
.set_string_param(self.params.file_path, 0, path.clone())?;
}
let exists = Path::new(&path).is_dir();
self.port_base
.set_int32_param(self.params.file_path_exists, 0, exists as i32)?;
Ok(exists)
}
pub fn create_file_path(path: &str, path_depth: i32) -> AsynResult<()> {
if path_depth == 0 {
return Ok(());
}
let bytes: Vec<char> = path.chars().collect();
let mut i = 0usize;
let mut prefix = String::new();
if bytes.len() >= 2 && bytes[1] == ':' {
prefix.push(bytes[0]);
prefix.push(':');
i = 2;
}
while i < bytes.len() && (bytes[i] == '/' || bytes[i] == '\\') {
prefix.push(bytes[i]);
i += 1;
}
let rest: String = bytes[i..].iter().collect();
let parts: Vec<&str> = rest.split(['/', '\\']).filter(|p| !p.is_empty()).collect();
let num_parts = parts.len() as i32;
let mut depth = path_depth;
if depth < 0 {
depth += num_parts;
if depth < 1 {
depth = 1;
}
}
let mut next_dir = prefix;
for (idx, part) in parts.iter().enumerate() {
next_dir.push_str(part);
if idx as i32 >= depth {
match std::fs::create_dir(&next_dir) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(e) => return Err(e.into()),
}
}
next_dir.push('/');
}
Ok(())
}
pub fn write_octet(&mut self, param_index: usize, value: &str) -> AsynResult<bool> {
if param_index == self.params.attributes_file
|| param_index == self.params.attributes_macros
{
let _ = self.read_nd_attributes_file();
Ok(true)
} else if param_index == self.params.file_path {
if !self.check_path()? {
let depth = self
.port_base
.get_int32_param(self.params.create_dir, 0)
.unwrap_or(0);
let _ = Self::create_file_path(value, depth);
self.check_path()?;
}
Ok(true)
} else {
Ok(false)
}
}
pub fn read_nd_attributes_file(&mut self) -> AsynResult<()> {
let file_param = self
.port_base
.get_string_param(self.params.attributes_file, 0)?
.to_string();
self.attributes.clear();
if file_param.is_empty() {
self.port_base
.set_int32_param(self.params.attributes_status, 0, ATTR_STATUS_OK)?;
return Ok(());
}
let xml = if file_param.contains("<Attributes>") {
file_param
} else {
match std::fs::read_to_string(&file_param) {
Ok(s) => s,
Err(_) => {
self.port_base.set_int32_param(
self.params.attributes_status,
0,
ATTR_STATUS_FILE_NOT_FOUND,
)?;
return Err(asyn_rs::error::AsynError::Status {
status: asyn_rs::error::AsynStatus::Error,
message: format!("readNDAttributesFile: cannot open {file_param}"),
});
}
}
};
match parse_attributes_xml(&xml, &self.attr_functions) {
Ok(attrs) => {
for attr in attrs {
self.attributes.add(attr);
}
self.port_base
.set_int32_param(self.params.attributes_status, 0, ATTR_STATUS_OK)?;
Ok(())
}
Err(msg) => {
self.port_base.set_int32_param(
self.params.attributes_status,
0,
ATTR_STATUS_XML_SYNTAX_ERROR,
)?;
Err(asyn_rs::error::AsynError::Status {
status: asyn_rs::error::AsynStatus::Error,
message: format!("readNDAttributesFile: {msg}"),
})
}
}
}
pub fn attributes(&self) -> &crate::attributes::NDAttributeList {
&self.attributes
}
pub fn update_attributes(&mut self) -> crate::attributes::NDAttributeList {
for attr in self.attributes.iter() {
if let Some(param_src) = attr.param_source() {
if let Some(value) = self.read_param_value(¶m_src.param_name, param_src.addr) {
param_src.cell().set(value);
}
}
}
self.attributes.update_values();
self.attributes.clone()
}
fn read_param_value(&self, param_name: &str, addr: i32) -> Option<NDAttrValue> {
use asyn_rs::param::ParamType;
let index = self.port_base.params.find_param(param_name)?;
match self.port_base.params.param_type(index)? {
ParamType::Int32 | ParamType::Enum | ParamType::UInt32Digital => self
.port_base
.params
.get_int32(index, addr)
.ok()
.map(NDAttrValue::Int32),
ParamType::Int64 => self
.port_base
.params
.get_int64(index, addr)
.ok()
.map(NDAttrValue::Int64),
ParamType::Float64 => self
.port_base
.params
.get_float64(index, addr)
.ok()
.map(NDAttrValue::Float64),
ParamType::Octet => self
.port_base
.params
.get_string(index, addr)
.ok()
.map(|s| NDAttrValue::String(s.to_string())),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::plugin::channel::ndarray_channel;
#[test]
fn test_new_sets_callbacks_enabled() {
let drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
assert_eq!(
drv.port_base
.get_int32_param(drv.params.array_callbacks, 0)
.unwrap(),
1,
);
}
#[test]
fn test_prepare_array() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let arr = drv
.pool
.alloc(
vec![
crate::ndarray::NDDimension::new(64),
crate::ndarray::NDDimension::new(64),
],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
drv.prepare_array(Arc::new(arr)).unwrap();
assert_eq!(
drv.port_base
.get_int32_param(drv.params.array_counter, 0)
.unwrap(),
1,
);
}
#[test]
fn test_prepare_updates_size_info() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let arr = drv
.pool
.alloc(
vec![
crate::ndarray::NDDimension::new(320),
crate::ndarray::NDDimension::new(240),
],
crate::ndarray::NDDataType::UInt16,
)
.unwrap();
drv.prepare_array(Arc::new(arr)).unwrap();
assert_eq!(
drv.port_base
.get_int32_param(drv.params.array_size_x, 0)
.unwrap(),
320,
);
assert_eq!(
drv.port_base
.get_int32_param(drv.params.array_size_y, 0)
.unwrap(),
240,
);
}
#[test]
fn test_create_file_name_empty_template_yields_empty() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
drv.port_base
.set_string_param(drv.params.file_path, 0, "/tmp/".into())
.unwrap();
drv.port_base
.set_string_param(drv.params.file_name, 0, "test_".into())
.unwrap();
drv.port_base
.set_int32_param(drv.params.file_number, 0, 42)
.unwrap();
drv.port_base
.set_string_param(drv.params.file_template, 0, "".into())
.unwrap();
let name = drv.create_file_name().unwrap();
assert_eq!(name, "");
}
#[test]
fn test_create_file_name_standard_template() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
drv.port_base
.set_string_param(drv.params.file_path, 0, "/tmp/".into())
.unwrap();
drv.port_base
.set_string_param(drv.params.file_name, 0, "test".into())
.unwrap();
drv.port_base
.set_int32_param(drv.params.file_number, 0, 42)
.unwrap();
drv.port_base
.set_string_param(drv.params.file_template, 0, "%s%s_%3.3d.dat".into())
.unwrap();
let name = drv.create_file_name().unwrap();
assert_eq!(name, "/tmp/test_042.dat");
}
#[test]
fn test_format_int_spec_width_vs_precision() {
assert_eq!(format_int_spec("3.3", 7), "007");
assert_eq!(format_int_spec("5.3", 42), " 042");
assert_eq!(format_int_spec("04", 7), "0007");
assert_eq!(format_int_spec("5", 7), " 7");
assert_eq!(format_int_spec("", 7), "7");
assert_eq!(format_int_spec("2.5", 12345), "12345");
assert_eq!(format_int_spec("6.3", -4), " -004");
assert_eq!(format_int_spec("05", -4), "-0004");
}
#[test]
fn test_check_path_exists() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
drv.port_base
.set_string_param(drv.params.file_path, 0, "/tmp".into())
.unwrap();
assert!(drv.check_path().unwrap());
}
#[test]
fn test_check_path_not_exists() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
drv.port_base
.set_string_param(drv.params.file_path, 0, "/nonexistent_path_xyz".into())
.unwrap();
assert!(!drv.check_path().unwrap());
}
#[test]
fn test_prepare_array_publishes_dims_type_timestamps() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let mut arr = drv
.pool
.alloc(
vec![
crate::ndarray::NDDimension::new(64),
crate::ndarray::NDDimension::new(48),
],
crate::ndarray::NDDataType::UInt16,
)
.unwrap();
arr.time_stamp = 100.5;
arr.timestamp = crate::timestamp::EpicsTimestamp {
sec: 1234,
nsec: 5678,
};
drv.prepare_array(Arc::new(arr)).unwrap();
assert_eq!(
drv.port_base
.get_int32_param(drv.params.n_dimensions, 0)
.unwrap(),
2
);
let dims = drv
.port_base
.params
.get_int32_array(drv.params.array_dimensions, 0)
.unwrap();
assert_eq!(&dims[..], &[64, 48]);
assert_eq!(
drv.port_base
.get_int32_param(drv.params.data_type, 0)
.unwrap(),
crate::ndarray::NDDataType::UInt16 as i32
);
assert_eq!(
drv.port_base
.get_float64_param(drv.params.timestamp_rbv, 0)
.unwrap(),
100.5
);
assert_eq!(
drv.port_base
.get_int32_param(drv.params.epics_ts_sec, 0)
.unwrap(),
1234
);
assert_eq!(
drv.port_base
.get_int32_param(drv.params.epics_ts_nsec, 0)
.unwrap(),
5678
);
}
#[test]
fn test_prepare_array_publishes_codec_and_bayer() {
use crate::attributes::{NDAttrSource, NDAttrValue, NDAttribute};
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let mut arr = drv
.pool
.alloc(
vec![crate::ndarray::NDDimension::new(16)],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
arr.codec = Some(crate::codec::Codec {
name: crate::codec::CodecName::BSLZ4,
compressed_size: 9,
level: 0,
shuffle: 0,
compressor: 0,
});
arr.attributes.add(NDAttribute {
name: "bayerPattern".into(),
description: String::new(),
source: NDAttrSource::Driver,
value: NDAttrValue::Int32(crate::color::NDBayerPattern::GRBG as i32),
source_impl: None,
});
drv.prepare_array(Arc::new(arr)).unwrap();
assert_eq!(
drv.port_base.get_string_param(drv.params.codec, 0).unwrap(),
"bslz4"
);
assert_eq!(
drv.port_base
.get_int32_param(drv.params.compressed_size, 0)
.unwrap(),
9
);
assert_eq!(
drv.port_base
.get_int32_param(drv.params.bayer_pattern, 0)
.unwrap(),
crate::color::NDBayerPattern::GRBG as i32
);
}
#[test]
fn test_connect_downstream() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let (sender, mut receiver) = ndarray_channel("DOWNSTREAM", 10);
drv.connect_downstream(sender);
assert_eq!(drv.num_plugins(), 1);
let arr = drv
.pool
.alloc(
vec![crate::ndarray::NDDimension::new(8)],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
let id = arr.unique_id;
let to_publish = drv.prepare_array(Arc::new(arr)).unwrap().unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _ = rt.block_on(drv.array_output.publish(to_publish));
let received = receiver.blocking_recv().unwrap();
assert_eq!(received.unique_id, id);
}
#[test]
fn test_create_file_path_recursive() {
let base = std::env::temp_dir().join(format!(
"ad_core_rs_cfp_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let nested = base.join("a").join("b").join("c");
let path = format!("{}/", nested.to_string_lossy());
NDArrayDriverBase::create_file_path(&path, 0).unwrap();
assert!(!nested.exists());
NDArrayDriverBase::create_file_path(&path, 1).unwrap();
assert!(nested.is_dir());
NDArrayDriverBase::create_file_path(&path, 1).unwrap();
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_read_nd_attributes_file_inline_xml() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let xml = r#"<Attributes>
<Attribute name="Gain" type="param" source="GAIN" description="detector gain"/>
<Attribute name="Comment" type="const" source="hello"/>
<Attribute name="Temp" type="EPICS_PV" source="$(P)Temp"/>
</Attributes>"#;
drv.port_base
.set_string_param(drv.params.attributes_file, 0, xml.into())
.unwrap();
drv.read_nd_attributes_file().unwrap();
assert_eq!(drv.attributes().len(), 3);
let gain = drv.attributes().get("Gain").unwrap();
assert!(matches!(gain.source, NDAttrSource::Param { .. }));
let comment = drv.attributes().get("Comment").unwrap();
assert_eq!(comment.value, NDAttrValue::String("hello".into()));
assert!(matches!(
drv.attributes().get("Temp").unwrap().source,
NDAttrSource::EpicsPV
));
assert_eq!(
drv.port_base
.get_int32_param(drv.params.attributes_status, 0)
.unwrap(),
ATTR_STATUS_OK
);
}
#[test]
fn test_param_attribute_reevaluates_from_param_library() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let xml = r#"<Attributes>
<Attribute name="Counter" type="PARAM" source="ARRAY_COUNTER" datatype="INT"/>
<Attribute name="Maker" type="PARAM" source="MANUFACTURER" datatype="STRING"/>
</Attributes>"#;
drv.port_base
.set_string_param(drv.params.attributes_file, 0, xml.into())
.unwrap();
drv.read_nd_attributes_file().unwrap();
drv.port_base
.set_int32_param(drv.params.array_counter, 0, 17)
.unwrap();
drv.port_base
.set_string_param(drv.params.manufacturer, 0, "ACME".into())
.unwrap();
let snap = drv.update_attributes();
assert_eq!(snap.get("Counter").unwrap().value, NDAttrValue::Int32(17));
assert_eq!(
snap.get("Maker").unwrap().value,
NDAttrValue::String("ACME".into())
);
drv.port_base
.set_int32_param(drv.params.array_counter, 0, 99)
.unwrap();
let snap2 = drv.update_attributes();
assert_eq!(snap2.get("Counter").unwrap().value, NDAttrValue::Int32(99));
}
#[test]
fn test_function_attribute_reevaluates_from_registry() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let counter = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0));
let c = counter.clone();
drv.attr_functions.register("tick", move |param: &str| {
let n = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
NDAttrValue::String(format!("{param}={n}"))
});
let xml = r#"<Attributes>
<Attribute name="Live" type="FUNCTION" source="tick" param="seq"/>
</Attributes>"#;
drv.port_base
.set_string_param(drv.params.attributes_file, 0, xml.into())
.unwrap();
drv.read_nd_attributes_file().unwrap();
assert_eq!(
drv.attributes().get("Live").unwrap().value,
NDAttrValue::String("seq=1".into())
);
let snap = drv.update_attributes();
assert_eq!(
snap.get("Live").unwrap().value,
NDAttrValue::String("seq=2".into())
);
let snap2 = drv.update_attributes();
assert_eq!(
snap2.get("Live").unwrap().value,
NDAttrValue::String("seq=3".into())
);
}
#[test]
fn test_function_attribute_missing_function_is_undefined() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let xml = r#"<Attributes>
<Attribute name="Missing" type="FUNCTION" source="no_such_fn"/>
</Attributes>"#;
drv.port_base
.set_string_param(drv.params.attributes_file, 0, xml.into())
.unwrap();
drv.read_nd_attributes_file().unwrap();
let snap = drv.update_attributes();
assert_eq!(snap.get("Missing").unwrap().value, NDAttrValue::Undefined);
}
#[test]
fn test_read_nd_attributes_file_empty_is_ok() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
drv.read_nd_attributes_file().unwrap();
assert_eq!(drv.attributes().len(), 0);
assert_eq!(
drv.port_base
.get_int32_param(drv.params.attributes_status, 0)
.unwrap(),
ATTR_STATUS_OK
);
}
#[test]
fn test_read_nd_attributes_file_missing_file() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
drv.port_base
.set_string_param(
drv.params.attributes_file,
0,
"/nonexistent_attrs_xyz.xml".into(),
)
.unwrap();
assert!(drv.read_nd_attributes_file().is_err());
assert_eq!(
drv.port_base
.get_int32_param(drv.params.attributes_status, 0)
.unwrap(),
ATTR_STATUS_FILE_NOT_FOUND
);
}
#[test]
fn test_write_octet_file_path_creates_dir() {
let mut drv = NDArrayDriverBase::new("TEST", 1_000_000).unwrap();
let base = std::env::temp_dir().join(format!(
"ad_core_rs_wo_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let target = base.join("sub");
let path = format!("{}/", target.to_string_lossy());
drv.port_base
.set_int32_param(drv.params.create_dir, 0, 1)
.unwrap();
drv.port_base
.set_string_param(drv.params.file_path, 0, path.clone())
.unwrap();
let handled = drv.write_octet(drv.params.file_path, &path).unwrap();
assert!(handled);
assert!(target.is_dir());
assert_eq!(
drv.port_base
.get_int32_param(drv.params.file_path_exists, 0)
.unwrap(),
1
);
let _ = std::fs::remove_dir_all(&base);
}
}