use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use asyn_rs::error::AsynResult;
use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
use asyn_rs::runtime::config::RuntimeConfig;
use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
use asyn_rs::user::AsynUser;
use asyn_rs::port_handle::PortHandle;
use crate::ndarray::NDArray;
use crate::ndarray_pool::NDArrayPool;
use crate::params::ndarray_driver::NDArrayDriverParams;
use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
use super::params::PluginBaseParams;
use super::wiring::{WiringRegistry, upstream_key};
#[derive(Debug, Clone)]
pub enum ParamChangeValue {
Int32(i32),
Float64(f64),
Octet(String),
}
impl ParamChangeValue {
pub fn as_i32(&self) -> i32 {
match self {
ParamChangeValue::Int32(v) => *v,
ParamChangeValue::Float64(v) => *v as i32,
ParamChangeValue::Octet(_) => 0,
}
}
pub fn as_f64(&self) -> f64 {
match self {
ParamChangeValue::Int32(v) => *v as f64,
ParamChangeValue::Float64(v) => *v,
ParamChangeValue::Octet(_) => 0.0,
}
}
pub fn as_string(&self) -> Option<&str> {
match self {
ParamChangeValue::Octet(s) => Some(s),
_ => None,
}
}
}
pub enum ParamUpdate {
Int32 {
reason: usize,
addr: i32,
value: i32,
},
Float64 {
reason: usize,
addr: i32,
value: f64,
},
Octet {
reason: usize,
addr: i32,
value: String,
},
Float64Array {
reason: usize,
addr: i32,
value: Vec<f64>,
},
}
impl ParamUpdate {
pub fn int32(reason: usize, value: i32) -> Self {
Self::Int32 {
reason,
addr: 0,
value,
}
}
pub fn float64(reason: usize, value: f64) -> Self {
Self::Float64 {
reason,
addr: 0,
value,
}
}
pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
Self::Int32 {
reason,
addr,
value,
}
}
pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
Self::Float64 {
reason,
addr,
value,
}
}
pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
Self::Float64Array {
reason,
addr: 0,
value,
}
}
pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
Self::Float64Array {
reason,
addr,
value,
}
}
pub fn octet(reason: usize, value: String) -> Self {
Self::Octet {
reason,
addr: 0,
value,
}
}
pub fn octet_addr(reason: usize, addr: i32, value: String) -> Self {
Self::Octet {
reason,
addr,
value,
}
}
}
pub struct ProcessResult {
pub output_arrays: Vec<Arc<NDArray>>,
pub param_updates: Vec<ParamUpdate>,
pub scatter_index: Option<usize>,
}
impl ProcessResult {
pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
Self {
output_arrays: vec![],
param_updates,
scatter_index: None,
}
}
pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
Self {
output_arrays,
param_updates: vec![],
scatter_index: None,
}
}
pub fn empty() -> Self {
Self {
output_arrays: vec![],
param_updates: vec![],
scatter_index: None,
}
}
pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
Self {
output_arrays,
param_updates: vec![],
scatter_index: Some(index),
}
}
}
pub struct ParamChangeResult {
pub output_arrays: Vec<Arc<NDArray>>,
pub param_updates: Vec<ParamUpdate>,
}
impl ParamChangeResult {
pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
Self {
output_arrays: vec![],
param_updates,
}
}
pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
Self {
output_arrays,
param_updates: vec![],
}
}
pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
Self {
output_arrays,
param_updates,
}
}
pub fn empty() -> Self {
Self {
output_arrays: vec![],
param_updates: vec![],
}
}
}
pub trait NDPluginProcess: Send + 'static {
fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
fn plugin_type(&self) -> &str;
fn compression_aware(&self) -> bool {
false
}
fn register_params(
&mut self,
_base: &mut PortDriverBase,
) -> Result<(), asyn_rs::error::AsynError> {
Ok(())
}
fn on_param_change(
&mut self,
_reason: usize,
_params: &PluginParamSnapshot,
) -> ParamChangeResult {
ParamChangeResult::empty()
}
fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
None
}
}
pub struct PluginParamSnapshot {
pub enable_callbacks: bool,
pub reason: usize,
pub addr: i32,
pub value: ParamChangeValue,
}
struct SortEntry {
arrays: Vec<Arc<NDArray>>,
inserted: std::time::Instant,
}
struct SortBuffer {
entries: BTreeMap<i32, SortEntry>,
prev_unique_id: i32,
first_output: bool,
disordered_arrays: i32,
dropped_output_arrays: i32,
}
impl SortBuffer {
fn new() -> Self {
Self {
entries: BTreeMap::new(),
prev_unique_id: 0,
first_output: true,
disordered_arrays: 0,
dropped_output_arrays: 0,
}
}
fn order_ok(&self, unique_id: i32) -> bool {
unique_id == self.prev_unique_id || unique_id == self.prev_unique_id + 1
}
fn note_emitted(&mut self, unique_id: i32) {
if !self.first_output && !self.order_ok(unique_id) {
self.disordered_arrays += 1;
}
self.first_output = false;
self.prev_unique_id = unique_id;
}
fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) -> bool {
if sort_size > 0 && self.entries.len() as i32 >= sort_size {
self.dropped_output_arrays += 1;
return false;
}
self.entries
.entry(unique_id)
.or_insert_with(|| SortEntry {
arrays: Vec::new(),
inserted: std::time::Instant::now(),
})
.arrays
.extend(arrays);
true
}
fn drain_ready(&mut self, sort_time: f64) -> Vec<(i32, Vec<Arc<NDArray>>)> {
let now = std::time::Instant::now();
let mut out = Vec::new();
while let Some((&head_id, entry)) = self.entries.iter().next() {
let delta = now.duration_since(entry.inserted).as_secs_f64();
let order_ok = self.order_ok(head_id);
if (!self.first_output && order_ok) || delta > sort_time {
let entry = self.entries.remove(&head_id).unwrap();
self.note_emitted(head_id);
out.push((head_id, entry.arrays));
} else {
break;
}
}
out
}
fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
let entries = std::mem::take(&mut self.entries);
let mut out = Vec::with_capacity(entries.len());
for (id, entry) in entries {
self.note_emitted(id);
out.push((id, entry.arrays));
}
out
}
fn len(&self) -> i32 {
self.entries.len() as i32
}
}
struct SharedProcessorInner<P: NDPluginProcess> {
processor: P,
output: Arc<parking_lot::Mutex<NDArrayOutput>>,
pool: Arc<NDArrayPool>,
ndarray_params: NDArrayDriverParams,
plugin_params: PluginBaseParams,
port_handle: PortHandle,
array_counter: i32,
std_array_data_param: Option<usize>,
min_callback_time: f64,
last_process_time: Option<std::time::Instant>,
sort_mode: i32,
sort_time: f64,
sort_size: i32,
sort_buffer: SortBuffer,
dropped_arrays: Arc<std::sync::atomic::AtomicI32>,
compression_aware: bool,
max_byte_rate: f64,
throttler: super::throttler::Throttler,
prev_input_array: Option<Arc<NDArray>>,
dims_prev: Vec<i32>,
nd_array_addr: i32,
max_threads: i32,
num_threads: i32,
}
impl<P: NDPluginProcess> SharedProcessorInner<P> {
fn should_throttle(&self) -> bool {
if self.min_callback_time <= 0.0 {
return false;
}
if let Some(last) = self.last_process_time {
last.elapsed().as_secs_f64() < self.min_callback_time
} else {
false
}
}
fn array_byte_cost(array: &NDArray) -> f64 {
match &array.codec {
Some(c) => c.compressed_size as f64,
None => array.info().total_bytes as f64,
}
}
fn throttle_ok(&mut self, array: &NDArray) -> bool {
if self.max_byte_rate == 0.0 {
return true;
}
let cost = Self::array_byte_cost(array);
if self.throttler.try_take(cost) {
true
} else {
self.sort_buffer.dropped_output_arrays += 1;
false
}
}
fn route_output_arrays(&mut self, arrays: Vec<Arc<NDArray>>) -> Vec<Arc<NDArray>> {
let mut ready = Vec::new();
for arr in arrays {
if !self.throttle_ok(&arr) {
continue; }
let uid = arr.unique_id;
if self.sort_mode != 0
&& !self.sort_buffer.first_output
&& !self.sort_buffer.order_ok(uid)
{
self.sort_buffer.insert(uid, vec![arr], self.sort_size);
} else {
self.sort_buffer.note_emitted(uid);
ready.push(arr);
}
}
if self.sort_mode != 0 {
for (_id, mut bucket) in self.sort_buffer.drain_ready(self.sort_time) {
ready.append(&mut bucket);
}
}
ready
}
fn process_and_publish(&mut self, array: &Arc<NDArray>) -> Option<ProcessOutput> {
if self.should_throttle() {
self.dropped_arrays
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
return Some(self.dropped_arrays_only_batch());
}
self.prev_input_array = Some(Arc::clone(array));
let t0 = std::time::Instant::now();
let result = self.processor.process_array(array, &self.pool);
let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
self.last_process_time = Some(t0);
let ready = self.route_output_arrays(result.output_arrays);
let mut output = self.build_publish_batch(
ready,
result.param_updates,
result.scatter_index,
Some(array.as_ref()),
elapsed_ms,
);
output.batch.merge(self.build_status_params_batch());
Some(output)
}
fn dropped_arrays_only_batch(&self) -> ProcessOutput {
ProcessOutput {
arrays: vec![],
scatter_index: None,
batch: self.build_status_params_batch(),
}
}
fn process_plugin(&mut self) -> Option<ProcessOutput> {
let prev = self.prev_input_array.clone()?;
self.process_and_publish(&prev)
}
fn tick_sort_buffer(&mut self) -> ProcessOutput {
let entries = self.sort_buffer.drain_ready(self.sort_time);
self.emit_drained(entries)
}
fn flush_sort_buffer(&mut self) -> ProcessOutput {
let entries = self.sort_buffer.drain_all();
self.emit_drained(entries)
}
fn emit_drained(&mut self, entries: Vec<(i32, Vec<Arc<NDArray>>)>) -> ProcessOutput {
let mut all_arrays = Vec::new();
let mut combined = ParamBatch::empty();
for (_unique_id, arrays) in entries {
let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
all_arrays.extend(output.arrays);
combined.merge(output.batch);
}
combined.merge(self.build_sort_params_batch());
ProcessOutput {
arrays: all_arrays,
scatter_index: None,
batch: combined,
}
}
fn build_sort_params_batch(&self) -> ParamBatch {
use asyn_rs::request::ParamSetValue;
let sort_free = self.sort_size - self.sort_buffer.len();
ParamBatch {
addr0: vec![
ParamSetValue::Int32 {
reason: self.plugin_params.sort_free,
addr: 0,
value: sort_free,
},
ParamSetValue::Int32 {
reason: self.plugin_params.disordered_arrays,
addr: 0,
value: self.sort_buffer.disordered_arrays,
},
ParamSetValue::Int32 {
reason: self.plugin_params.dropped_output_arrays,
addr: 0,
value: self.sort_buffer.dropped_output_arrays,
},
],
extra: std::collections::HashMap::new(),
}
}
fn build_status_params_batch(&self) -> ParamBatch {
use asyn_rs::request::ParamSetValue;
let mut batch = self.build_sort_params_batch();
batch.addr0.push(ParamSetValue::Int32 {
reason: self.plugin_params.dropped_arrays,
addr: 0,
value: self
.dropped_arrays
.load(std::sync::atomic::Ordering::Acquire),
});
batch
}
fn build_publish_batch(
&mut self,
output_arrays: Vec<Arc<NDArray>>,
param_updates: Vec<ParamUpdate>,
scatter_index: Option<usize>,
fallback_array: Option<&NDArray>,
elapsed_ms: f64,
) -> ProcessOutput {
use asyn_rs::request::ParamSetValue;
let mut addr0: Vec<ParamSetValue> = Vec::new();
let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
std::collections::HashMap::new();
if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
self.array_counter += 1;
if let Some(param) = self.std_array_data_param {
use crate::ndarray::NDDataBuffer;
use asyn_rs::param::ParamValue;
let value = match &report_arr.data {
NDDataBuffer::I8(v) => {
Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
}
NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
))),
NDDataBuffer::I16(v) => {
Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
}
NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
))),
NDDataBuffer::I32(v) => {
Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
}
NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
))),
NDDataBuffer::I64(v) => {
Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
}
NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
))),
NDDataBuffer::F32(v) => {
Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
}
NDDataBuffer::F64(v) => {
Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
}
};
if let Some(value) = value {
let ts = report_arr.timestamp.to_system_time();
self.port_handle
.interrupts()
.notify(asyn_rs::interrupt::InterruptValue {
reason: param,
addr: 0,
value,
timestamp: ts,
uint32_changed_mask: 0,
..Default::default()
});
}
}
let info = report_arr.info();
let color_mode = report_arr
.attributes
.get("ColorMode")
.and_then(|a| a.value.as_i64())
.map(|v| v as i32)
.unwrap_or(info.color_mode as i32);
let bayer_pattern = report_arr
.attributes
.get("BayerPattern")
.and_then(|a| a.value.as_i64())
.map(|v| v as i32)
.unwrap_or(0);
let cur_dims: Vec<i32> = report_arr.dims.iter().map(|d| d.size as i32).collect();
if cur_dims != self.dims_prev {
self.dims_prev = cur_dims.clone();
self.port_handle
.interrupts()
.notify(asyn_rs::interrupt::InterruptValue {
reason: self.ndarray_params.array_dimensions,
addr: 0,
value: asyn_rs::param::ParamValue::Int32Array(std::sync::Arc::from(
cur_dims.as_slice(),
)),
timestamp: report_arr.timestamp.to_system_time(),
uint32_changed_mask: 0,
..Default::default()
});
}
addr0.extend([
ParamSetValue::Int32 {
reason: self.ndarray_params.array_counter,
addr: 0,
value: self.array_counter,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.unique_id,
addr: 0,
value: report_arr.unique_id,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.n_dimensions,
addr: 0,
value: report_arr.dims.len() as i32,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.array_size_x,
addr: 0,
value: info.x_size as i32,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.array_size_y,
addr: 0,
value: info.y_size as i32,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.array_size_z,
addr: 0,
value: info.color_size as i32,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.array_size,
addr: 0,
value: info.total_bytes as i32,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.data_type,
addr: 0,
value: report_arr.data.data_type() as i32,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.color_mode,
addr: 0,
value: color_mode,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.bayer_pattern,
addr: 0,
value: bayer_pattern,
},
ParamSetValue::Float64 {
reason: self.ndarray_params.timestamp_rbv,
addr: 0,
value: report_arr.timestamp.as_f64(),
},
ParamSetValue::Int32 {
reason: self.ndarray_params.epics_ts_sec,
addr: 0,
value: report_arr.timestamp.sec as i32,
},
ParamSetValue::Int32 {
reason: self.ndarray_params.epics_ts_nsec,
addr: 0,
value: report_arr.timestamp.nsec as i32,
},
]);
}
addr0.push(ParamSetValue::Float64 {
reason: self.plugin_params.execution_time,
addr: 0,
value: elapsed_ms,
});
for update in ¶m_updates {
match update {
ParamUpdate::Int32 {
reason,
addr,
value,
} => {
let pv = ParamSetValue::Int32 {
reason: *reason,
addr: *addr,
value: *value,
};
if *addr == 0 {
addr0.push(pv);
} else {
extra.entry(*addr).or_default().push(pv);
}
}
ParamUpdate::Float64 {
reason,
addr,
value,
} => {
let pv = ParamSetValue::Float64 {
reason: *reason,
addr: *addr,
value: *value,
};
if *addr == 0 {
addr0.push(pv);
} else {
extra.entry(*addr).or_default().push(pv);
}
}
ParamUpdate::Octet {
reason,
addr,
value,
} => {
let pv = ParamSetValue::Octet {
reason: *reason,
addr: *addr,
value: value.clone(),
};
if *addr == 0 {
addr0.push(pv);
} else {
extra.entry(*addr).or_default().push(pv);
}
}
ParamUpdate::Float64Array {
reason,
addr,
value,
} => {
let pv = ParamSetValue::Float64Array {
reason: *reason,
addr: *addr,
value: value.clone(),
};
if *addr == 0 {
addr0.push(pv);
} else {
extra.entry(*addr).or_default().push(pv);
}
}
}
}
ProcessOutput {
arrays: output_arrays,
scatter_index,
batch: ParamBatch { addr0, extra },
}
}
}
struct ProcessOutput {
arrays: Vec<Arc<NDArray>>,
scatter_index: Option<usize>,
batch: ParamBatch,
}
impl ProcessOutput {
async fn publish_arrays(&self, senders: &[NDArraySender]) {
for arr in &self.arrays {
if let Some(idx) = self.scatter_index {
if let Some(sender) = senders.get(idx % senders.len().max(1)) {
sender.publish(arr.clone()).await;
}
} else {
let futs = senders.iter().map(|s| s.publish(arr.clone()));
futures_util::future::join_all(futs).await;
}
}
}
}
struct ParamBatch {
addr0: Vec<asyn_rs::request::ParamSetValue>,
extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
}
impl ParamBatch {
fn empty() -> Self {
Self {
addr0: Vec::new(),
extra: std::collections::HashMap::new(),
}
}
fn merge(&mut self, other: ParamBatch) {
self.addr0.extend(other.addr0);
for (addr, updates) in other.extra {
self.extra.entry(addr).or_default().extend(updates);
}
}
async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
if !self.addr0.is_empty() {
if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
eprintln!("plugin param flush error (addr 0): {e}");
}
}
for (addr, updates) in self.extra {
if let Err(e) = port.set_params_and_notify(addr, updates).await {
eprintln!("plugin param flush error (addr {addr}): {e}");
}
}
}
}
#[allow(dead_code)]
pub struct PluginPortDriver {
base: PortDriverBase,
ndarray_params: NDArrayDriverParams,
plugin_params: PluginBaseParams,
param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
std_array_data_param: Option<usize>,
}
impl PluginPortDriver {
fn new<P: NDPluginProcess>(
port_name: &str,
plugin_type_name: &str,
queue_size: usize,
ndarray_port: &str,
max_addr: usize,
param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
processor: &mut P,
array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
) -> AsynResult<Self> {
let mut base = PortDriverBase::new(
port_name,
max_addr,
PortFlags {
can_block: true,
..Default::default()
},
);
let ndarray_params = NDArrayDriverParams::create(&mut base)?;
let plugin_params = PluginBaseParams::create(&mut base)?;
base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
base.set_int32_param(plugin_params.queue_use, 0, 0)?;
base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
base.set_int32_param(ndarray_params.write_file, 0, 0)?;
base.set_int32_param(ndarray_params.read_file, 0, 0)?;
base.set_int32_param(ndarray_params.capture, 0, 0)?;
base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
base.set_string_param(ndarray_params.file_path, 0, "".into())?;
base.set_string_param(ndarray_params.file_name, 0, "".into())?;
base.set_int32_param(ndarray_params.file_number, 0, 0)?;
base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
base.set_string_param(
ndarray_params.ad_core_version,
0,
env!("CARGO_PKG_VERSION").into(),
)?;
base.set_string_param(
ndarray_params.driver_version,
0,
env!("CARGO_PKG_VERSION").into(),
)?;
if !ndarray_port.is_empty() {
base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
}
let std_array_data_param = if array_data.is_some() {
Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
} else {
None
};
processor.register_params(&mut base)?;
Ok(Self {
base,
ndarray_params,
plugin_params,
param_change_tx,
array_data,
std_array_data_param,
})
}
}
fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
let n = src.len().min(dst.len());
dst[..n].copy_from_slice(&src[..n]);
n
}
fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
where
S: CastToF64 + Copy,
D: CastFromF64 + Copy,
{
let n = src.len().min(dst.len());
for i in 0..n {
dst[i] = D::cast_from_f64(src[i].cast_to_f64());
}
n
}
trait CCastTo<D> {
fn ccast(self) -> D;
}
macro_rules! impl_ccast {
( $src:ty => $( $dst:ty ),+ ) => {
$(
impl CCastTo<$dst> for $src {
#[inline]
fn ccast(self) -> $dst {
self as $dst
}
}
)+
};
}
impl_ccast!(i8 => i16, i32, i64);
impl_ccast!(u8 => i8, i16, i32, i64);
impl_ccast!(i16 => i8, i32, i64);
impl_ccast!(u16 => i8, i16, i32, i64);
impl_ccast!(i32 => i8, i16, i64);
impl_ccast!(u32 => i8, i16, i32, i64);
impl_ccast!(i64 => i8, i16, i32);
impl_ccast!(u64 => i8, i16, i32, i64);
fn copy_ccast<S, D>(src: &[S], dst: &mut [D]) -> usize
where
S: CCastTo<D> + Copy,
D: Copy,
{
let n = src.len().min(dst.len());
for i in 0..n {
dst[i] = src[i].ccast();
}
n
}
trait CastToF64 {
fn cast_to_f64(self) -> f64;
}
impl CastToF64 for i8 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for u8 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for i16 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for u16 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for i32 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for u32 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for i64 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for u64 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for f32 {
fn cast_to_f64(self) -> f64 {
self as f64
}
}
impl CastToF64 for f64 {
fn cast_to_f64(self) -> f64 {
self
}
}
trait CastFromF64 {
fn cast_from_f64(v: f64) -> Self;
}
impl CastFromF64 for i8 {
fn cast_from_f64(v: f64) -> Self {
v as i8
}
}
impl CastFromF64 for i16 {
fn cast_from_f64(v: f64) -> Self {
v as i16
}
}
impl CastFromF64 for i32 {
fn cast_from_f64(v: f64) -> Self {
v as i32
}
}
impl CastFromF64 for i64 {
fn cast_from_f64(v: f64) -> Self {
v as i64
}
}
impl CastFromF64 for f32 {
fn cast_from_f64(v: f64) -> Self {
v as f32
}
}
impl CastFromF64 for f64 {
fn cast_from_f64(v: f64) -> Self {
v
}
}
macro_rules! impl_read_array {
(
$self:expr, $buf:expr, $direct_variant:ident,
ccast: [ $( $ccast_variant:ident ),* ],
convert: [ $( $variant:ident ),* ]
) => {{
use crate::ndarray::NDDataBuffer;
let handle = match &$self.array_data {
Some(h) => h,
None => return Ok(0),
};
let guard = handle.lock();
let array = match &*guard {
Some(a) => a,
None => return Ok(0),
};
let n = match &array.data {
NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
$( NDDataBuffer::$ccast_variant(v) => copy_ccast(v, $buf), )*
$( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
};
Ok(n)
}};
}
impl PortDriver for PluginPortDriver {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
let reason = user.reason;
let addr = user.addr;
self.base.set_int32_param(reason, addr, value)?;
self.base.call_param_callbacks(addr)?;
let _ = self
.param_change_tx
.send((reason, addr, ParamChangeValue::Int32(value)));
Ok(())
}
fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
let reason = user.reason;
let addr = user.addr;
self.base.set_float64_param(reason, addr, value)?;
self.base.call_param_callbacks(addr)?;
let _ = self
.param_change_tx
.send((reason, addr, ParamChangeValue::Float64(value)));
Ok(())
}
fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
let reason = user.reason;
let addr = user.addr;
let s = String::from_utf8_lossy(data).into_owned();
self.base.set_string_param(reason, addr, s.clone())?;
self.base.call_param_callbacks(addr)?;
let _ = self
.param_change_tx
.send((reason, addr, ParamChangeValue::Octet(s)));
Ok(())
}
fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
impl_read_array!(
self, buf, I8,
ccast: [U8, I16, U16, I32, U32, I64, U64],
convert: [F32, F64]
)
}
fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
impl_read_array!(
self, buf, I16,
ccast: [I8, U8, U16, I32, U32, I64, U64],
convert: [F32, F64]
)
}
fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
impl_read_array!(
self, buf, I32,
ccast: [I8, U8, I16, U16, U32, I64, U64],
convert: [F32, F64]
)
}
fn read_int64_array(&mut self, _user: &AsynUser, buf: &mut [i64]) -> AsynResult<usize> {
impl_read_array!(
self, buf, I64,
ccast: [I8, U8, I16, U16, I32, U32, U64],
convert: [F32, F64]
)
}
fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
impl_read_array!(
self, buf, F32,
ccast: [],
convert: [I8, U8, I16, U16, I32, U32, I64, U64, F64]
)
}
fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
impl_read_array!(
self, buf, F64,
ccast: [],
convert: [I8, U8, I16, U16, I32, U32, I64, U64, F32]
)
}
}
#[derive(Clone)]
pub struct PluginRuntimeHandle {
port_runtime: PortRuntimeHandle,
array_sender: NDArraySender,
array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
port_name: String,
pub ndarray_params: NDArrayDriverParams,
pub plugin_params: PluginBaseParams,
}
impl PluginRuntimeHandle {
pub fn port_runtime(&self) -> &PortRuntimeHandle {
&self.port_runtime
}
pub fn array_sender(&self) -> &NDArraySender {
&self.array_sender
}
pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
&self.array_output
}
pub fn port_name(&self) -> &str {
&self.port_name
}
}
pub fn create_plugin_runtime<P: NDPluginProcess>(
port_name: &str,
processor: P,
pool: Arc<NDArrayPool>,
queue_size: usize,
ndarray_port: &str,
wiring: Arc<WiringRegistry>,
) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
create_plugin_runtime_multi_addr(
port_name,
processor,
pool,
queue_size,
ndarray_port,
wiring,
1,
)
}
pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
port_name: &str,
mut processor: P,
pool: Arc<NDArrayPool>,
queue_size: usize,
ndarray_port: &str,
wiring: Arc<WiringRegistry>,
max_addr: usize,
) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
let (param_tx, param_rx) =
tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
let plugin_type_name = processor.plugin_type().to_string();
let compression_aware = processor.compression_aware();
let array_data = processor.array_data_handle();
let driver = PluginPortDriver::new(
port_name,
&plugin_type_name,
queue_size,
ndarray_port,
max_addr,
param_tx,
&mut processor,
array_data,
)
.expect("failed to create plugin port driver");
let ndarray_params = driver.ndarray_params;
let plugin_params = driver.plugin_params;
let std_array_data_param = driver.std_array_data_param;
let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
let port_handle = port_runtime.port_handle().clone();
let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
let enabled = Arc::new(AtomicBool::new(false));
let blocking_mode = Arc::new(AtomicBool::new(false));
let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
let array_output_for_handle = array_output.clone();
wiring.register_output_addrs(port_name, max_addr, array_output.clone());
let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
processor,
output: array_output,
pool,
ndarray_params,
plugin_params,
port_handle,
array_counter: 0,
std_array_data_param,
min_callback_time: 0.0,
last_process_time: None,
sort_mode: 0,
sort_time: 0.0,
sort_size: 10,
sort_buffer: SortBuffer::new(),
dropped_arrays: dropped_arrays_counter,
compression_aware,
max_byte_rate: 0.0,
throttler: super::throttler::Throttler::new(0.0),
prev_input_array: None,
dims_prev: Vec::new(),
nd_array_addr: 0,
max_threads: 1,
num_threads: 1,
}));
let data_enabled = enabled.clone();
let data_blocking = blocking_mode.clone();
let mut array_sender = array_sender;
array_sender.set_mode_flags(enabled, blocking_mode);
let sender_port_name = port_name.to_string();
let initial_upstream = ndarray_port.to_string();
let data_jh = thread::Builder::new()
.name(format!("plugin-data-{port_name}"))
.spawn(move || {
plugin_data_loop(
shared,
array_rx,
param_rx,
plugin_params,
ndarray_params.array_counter,
data_enabled,
data_blocking,
sender_port_name,
initial_upstream,
wiring,
);
})
.expect("failed to spawn plugin data thread");
let handle = PluginRuntimeHandle {
port_runtime,
array_sender,
array_output: array_output_for_handle,
port_name: port_name.to_string(),
ndarray_params,
plugin_params,
};
(handle, data_jh)
}
fn queue_status_batch(
plugin_params: &PluginBaseParams,
max_capacity: usize,
free: i32,
) -> ParamBatch {
use asyn_rs::request::ParamSetValue;
ParamBatch {
addr0: vec![
ParamSetValue::Int32 {
reason: plugin_params.queue_size,
addr: 0,
value: max_capacity as i32,
},
ParamSetValue::Int32 {
reason: plugin_params.queue_use,
addr: 0,
value: free,
},
],
extra: std::collections::HashMap::new(),
}
}
async fn clamp_writeback(port: &PortHandle, reason: usize, value: i32) {
use asyn_rs::request::ParamSetValue;
let _ = port
.set_params_and_notify(
0,
vec![ParamSetValue::Int32 {
reason,
addr: 0,
value,
}],
)
.await;
}
fn plugin_data_loop<P: NDPluginProcess>(
shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
mut array_rx: NDArrayReceiver,
mut param_rx: tokio::sync::mpsc::UnboundedReceiver<(usize, i32, ParamChangeValue)>,
plugin_params: PluginBaseParams,
array_counter_reason: usize,
enabled: Arc<AtomicBool>,
blocking_mode: Arc<AtomicBool>,
sender_port_name: String,
initial_upstream: String,
wiring: Arc<WiringRegistry>,
) {
let enable_callbacks_reason = plugin_params.enable_callbacks;
let blocking_callbacks_reason = plugin_params.blocking_callbacks;
let min_callback_time_reason = plugin_params.min_callback_time;
let sort_mode_reason = plugin_params.sort_mode;
let sort_time_reason = plugin_params.sort_time;
let sort_size_reason = plugin_params.sort_size;
let nd_array_port_reason = plugin_params.nd_array_port;
let nd_array_addr_reason = plugin_params.nd_array_addr;
let process_plugin_reason = plugin_params.process_plugin;
let max_byte_rate_reason = plugin_params.max_byte_rate;
let num_threads_reason = plugin_params.num_threads;
let max_threads_reason = plugin_params.max_threads;
let mut current_upstream = initial_upstream;
let mut current_addr: i32 = 0;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
let mut sort_flush_active = false;
let mut last_queue_free: Option<i32> = None;
loop {
tokio::select! {
msg = array_rx.recv_msg() => {
match msg {
Some(msg) => {
if !enabled.load(Ordering::Acquire) {
continue;
}
let (process_output, senders, port) = {
let mut guard = shared.lock();
let compressed = msg.array.codec.is_some();
let output = if compressed && !guard.compression_aware {
guard
.dropped_arrays
.fetch_add(1, Ordering::AcqRel);
Some(guard.dropped_arrays_only_batch())
} else {
guard.process_and_publish(&msg.array)
};
let senders = guard.output.lock().senders_clone();
let port = guard.port_handle.clone();
(output, senders, port)
};
let max_cap = array_rx.max_capacity();
let free = max_cap.saturating_sub(array_rx.pending()) as i32;
let queue_batch = if last_queue_free != Some(free) {
last_queue_free = Some(free);
Some(queue_status_batch(&plugin_params, max_cap, free))
} else {
None
};
if let Some(po) = process_output {
po.publish_arrays(&senders).await;
po.batch.flush(&port).await;
}
if let Some(qb) = queue_batch {
qb.flush(&port).await;
}
}
None => break,
}
}
param = param_rx.recv() => {
match param {
Some((reason, addr, value)) => {
if reason == enable_callbacks_reason {
let on = value.as_i32() != 0;
enabled.store(on, Ordering::Release);
if !on {
shared.lock().prev_input_array = None;
}
}
if reason == blocking_callbacks_reason {
blocking_mode.store(value.as_i32() != 0, Ordering::Release);
}
if reason == min_callback_time_reason {
shared.lock().min_callback_time = value.as_f64();
}
if reason == max_byte_rate_reason {
let rate = value.as_f64();
let mut guard = shared.lock();
guard.max_byte_rate = rate;
guard.throttler.reset(rate);
}
if reason == max_threads_reason {
let (port, clamped, mt) = {
let mut guard = shared.lock();
guard.max_threads = value.as_i32().max(1);
let clamped =
guard.num_threads.clamp(1, guard.max_threads);
guard.num_threads = clamped;
(guard.port_handle.clone(), clamped, guard.max_threads)
};
clamp_writeback(&port, num_threads_reason, clamped).await;
clamp_writeback(&port, max_threads_reason, mt).await;
}
if reason == num_threads_reason {
let (port, clamped) = {
let mut guard = shared.lock();
let clamped =
value.as_i32().clamp(1, guard.max_threads.max(1));
guard.num_threads = clamped;
(guard.port_handle.clone(), clamped)
};
clamp_writeback(&port, num_threads_reason, clamped).await;
}
if reason == nd_array_addr_reason {
let new_addr = value.as_i32();
if new_addr != current_addr {
let old_key = upstream_key(¤t_upstream, current_addr);
let new_key = upstream_key(¤t_upstream, new_addr);
shared.lock().nd_array_addr = new_addr;
match wiring.rewire_by_name(
&sender_port_name,
&old_key,
&new_key,
) {
Ok(()) => current_addr = new_addr,
Err(e) => {
eprintln!("NDArrayAddr reconnect failed: {e}");
shared.lock().nd_array_addr = current_addr;
}
}
}
}
if reason == process_plugin_reason && value.as_i32() != 0 {
let (process_output, senders, port) = {
let mut guard = shared.lock();
let output = guard.process_plugin();
let senders = guard.output.lock().senders_clone();
let port = guard.port_handle.clone();
(output, senders, port)
};
if let Some(po) = process_output {
po.publish_arrays(&senders).await;
po.batch.flush(&port).await;
} else {
eprintln!(
"plugin {sender_port_name}: ProcessPlugin \
requested but no input array cached"
);
}
}
if reason == array_counter_reason {
shared.lock().array_counter = value.as_i32();
}
if reason == sort_mode_reason {
let mode = value.as_i32();
let flush_work = {
let mut guard = shared.lock();
guard.sort_mode = mode;
if mode == 0 {
let output = guard.flush_sort_buffer();
let senders = guard.output.lock().senders_clone();
let port = guard.port_handle.clone();
sort_flush_active = false;
Some((output, senders, port))
} else {
sort_flush_active = guard.sort_time > 0.0;
if sort_flush_active {
let dur = std::time::Duration::from_secs_f64(guard.sort_time);
sort_flush_interval = tokio::time::interval(dur);
}
None
}
};
if let Some((output, senders, port)) = flush_work {
output.publish_arrays(&senders).await;
output.batch.flush(&port).await;
}
}
if reason == sort_time_reason {
let t = value.as_f64();
let mut guard = shared.lock();
guard.sort_time = t;
if guard.sort_mode != 0 && t > 0.0 {
sort_flush_active = true;
let dur = std::time::Duration::from_secs_f64(t);
sort_flush_interval = tokio::time::interval(dur);
} else {
sort_flush_active = false;
}
drop(guard);
}
if reason == sort_size_reason {
shared.lock().sort_size = value.as_i32();
}
if reason == nd_array_port_reason {
if let Some(new_port) = value.as_string() {
if new_port != current_upstream {
let old_key =
upstream_key(¤t_upstream, current_addr);
let new_key = upstream_key(new_port, current_addr);
match wiring.rewire_by_name(
&sender_port_name,
&old_key,
&new_key,
) {
Ok(()) => current_upstream = new_port.to_string(),
Err(e) => {
eprintln!("NDArrayPort rewire failed: {e}")
}
}
}
}
}
let snapshot = PluginParamSnapshot {
enable_callbacks: enabled.load(Ordering::Acquire),
reason,
addr,
value,
};
let (process_output, senders, port) = {
let mut guard = shared.lock();
let t0 = std::time::Instant::now();
let result = guard.processor.on_param_change(reason, &snapshot);
let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
} else {
None
};
let senders = guard.output.lock().senders_clone();
(output, senders, guard.port_handle.clone())
};
if let Some(po) = process_output {
po.publish_arrays(&senders).await;
po.batch.flush(&port).await;
}
}
None => break,
}
}
_ = sort_flush_interval.tick(), if sort_flush_active => {
let (output, senders, port) = {
let mut guard = shared.lock();
let output = guard.tick_sort_buffer();
let senders = guard.output.lock().senders_clone();
let port = guard.port_handle.clone();
(output, senders, port)
};
output.publish_arrays(&senders).await;
output.batch.flush(&port).await;
}
}
}
});
}
pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
upstream.array_output().lock().add(downstream_sender);
}
pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
port_name: &str,
mut processor: P,
pool: Arc<NDArrayPool>,
queue_size: usize,
output: NDArrayOutput,
ndarray_port: &str,
wiring: Arc<WiringRegistry>,
) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
let (param_tx, param_rx) =
tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
let plugin_type_name = processor.plugin_type().to_string();
let compression_aware = processor.compression_aware();
let array_data = processor.array_data_handle();
let driver = PluginPortDriver::new(
port_name,
&plugin_type_name,
queue_size,
ndarray_port,
1,
param_tx,
&mut processor,
array_data,
)
.expect("failed to create plugin port driver");
let ndarray_params = driver.ndarray_params;
let plugin_params = driver.plugin_params;
let std_array_data_param = driver.std_array_data_param;
let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
let port_handle = port_runtime.port_handle().clone();
let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
let enabled = Arc::new(AtomicBool::new(false));
let blocking_mode = Arc::new(AtomicBool::new(false));
let array_output = Arc::new(parking_lot::Mutex::new(output));
let array_output_for_handle = array_output.clone();
wiring.register_output(port_name, array_output.clone());
let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
processor,
output: array_output,
pool,
ndarray_params,
plugin_params,
port_handle,
array_counter: 0,
std_array_data_param,
min_callback_time: 0.0,
last_process_time: None,
sort_mode: 0,
sort_time: 0.0,
sort_size: 10,
sort_buffer: SortBuffer::new(),
dropped_arrays: dropped_arrays_counter,
compression_aware,
max_byte_rate: 0.0,
throttler: super::throttler::Throttler::new(0.0),
prev_input_array: None,
dims_prev: Vec::new(),
nd_array_addr: 0,
max_threads: 1,
num_threads: 1,
}));
let data_enabled = enabled.clone();
let data_blocking = blocking_mode.clone();
let mut array_sender = array_sender;
array_sender.set_mode_flags(enabled, blocking_mode);
let sender_port_name = port_name.to_string();
let initial_upstream = ndarray_port.to_string();
let data_jh = thread::Builder::new()
.name(format!("plugin-data-{port_name}"))
.spawn(move || {
plugin_data_loop(
shared,
array_rx,
param_rx,
plugin_params,
ndarray_params.array_counter,
data_enabled,
data_blocking,
sender_port_name,
initial_upstream,
wiring,
);
})
.expect("failed to spawn plugin data thread");
let handle = PluginRuntimeHandle {
port_runtime,
array_sender,
array_output: array_output_for_handle,
port_name: port_name.to_string(),
ndarray_params,
plugin_params,
};
(handle, data_jh)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ndarray::{NDDataType, NDDimension};
use crate::plugin::channel::ndarray_channel;
struct PassthroughProcessor;
impl NDPluginProcess for PassthroughProcessor {
fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
ProcessResult::arrays(vec![Arc::new(array.clone())])
}
fn plugin_type(&self) -> &str {
"Passthrough"
}
}
struct SinkProcessor {
count: usize,
}
impl NDPluginProcess for SinkProcessor {
fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
self.count += 1;
ProcessResult::empty()
}
fn plugin_type(&self) -> &str {
"Sink"
}
}
fn make_test_array(id: i32) -> Arc<NDArray> {
let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
arr.unique_id = id;
Arc::new(arr)
}
fn test_wiring() -> Arc<WiringRegistry> {
Arc::new(WiringRegistry::new())
}
fn enable_callbacks(handle: &PluginRuntimeHandle) {
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
let sender = sender.clone();
let jh = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(sender.publish(array));
});
jh.join().unwrap();
}
#[test]
fn test_passthrough_runtime() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"PASS1",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
send_array(handle.array_sender(), make_test_array(42));
let received = downstream_rx.blocking_recv().unwrap();
assert_eq!(received.unique_id, 42);
}
#[test]
fn test_sink_runtime() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (handle, _data_jh) = create_plugin_runtime(
"SINK1",
SinkProcessor { count: 0 },
pool,
10,
"",
test_wiring(),
);
enable_callbacks(&handle);
send_array(handle.array_sender(), make_test_array(1));
send_array(handle.array_sender(), make_test_array(2));
std::thread::sleep(std::time::Duration::from_millis(50));
assert_eq!(handle.port_name(), "SINK1");
}
#[test]
fn test_plugin_type_param() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (handle, _data_jh) = create_plugin_runtime(
"TYPE_TEST",
PassthroughProcessor,
pool,
10,
"",
test_wiring(),
);
assert_eq!(handle.port_name(), "TYPE_TEST");
assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
}
#[test]
fn test_shutdown_on_handle_drop() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (handle, data_jh) = create_plugin_runtime(
"SHUTDOWN_TEST",
PassthroughProcessor,
pool,
10,
"",
test_wiring(),
);
let sender = handle.array_sender().clone();
drop(handle);
drop(sender);
let result = data_jh.join();
assert!(result.is_ok());
}
#[test]
fn test_wire_to_nonzero_ndarray_addr() {
use crate::plugin::wiring::upstream_key;
let pool = Arc::new(NDArrayPool::new(1_000_000));
let wiring = test_wiring();
let (up_handle, _up_jh) = create_plugin_runtime_multi_addr(
"UP_MULTI",
PassthroughProcessor,
pool,
10,
"",
wiring.clone(),
2,
);
enable_callbacks(&up_handle);
let addr0 = wiring.lookup_output("UP_MULTI");
let addr1 = wiring.lookup_output(&upstream_key("UP_MULTI", 1));
assert!(addr0.is_some(), "addr 0 output must be registered");
assert!(
addr1.is_some(),
"addr 1 output must be registered for a max_addr=2 port"
);
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWN_ADDR1", 10);
wiring
.rewire(&downstream_sender, "", &upstream_key("UP_MULTI", 1))
.expect("wiring a consumer to NDArrayAddr=1 must succeed");
send_array(up_handle.array_sender(), make_test_array(99));
let received = downstream_rx.blocking_recv().unwrap();
assert_eq!(
received.unique_id, 99,
"consumer wired to NDArrayAddr=1 must receive upstream arrays"
);
}
#[test]
fn test_nonblocking_passthrough() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"NB_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
send_array(handle.array_sender(), make_test_array(42));
let received = downstream_rx.blocking_recv().unwrap();
assert_eq!(received.unique_id, 42);
}
#[test]
fn test_blocking_to_nonblocking_switch() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"SWITCH_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
send_array(handle.array_sender(), make_test_array(1));
let received = downstream_rx.blocking_recv().unwrap();
assert_eq!(received.unique_id, 1);
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
send_array(handle.array_sender(), make_test_array(2));
let received = downstream_rx.blocking_recv().unwrap();
assert_eq!(received.unique_id, 2);
}
#[test]
fn test_enable_callbacks_disables_processing() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"ENABLE_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
send_array(handle.array_sender(), make_test_array(99));
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let result = rt.block_on(async {
tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
});
assert!(
result.is_err(),
"should not receive array when callbacks disabled"
);
}
#[test]
fn test_downstream_receives_multiple() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (ds1, mut rx1) = ndarray_channel("DS1", 10);
let (ds2, mut rx2) = ndarray_channel("DS2", 10);
let mut output = NDArrayOutput::new();
output.add(ds1);
output.add(ds2);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"DS_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
send_array(handle.array_sender(), make_test_array(77));
let r1 = rx1.blocking_recv().unwrap();
let r2 = rx2.blocking_recv().unwrap();
assert_eq!(r1.unique_id, 77);
assert_eq!(r2.unique_id, 77);
}
#[test]
fn test_param_updates_after_send() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
struct ParamTracker;
impl NDPluginProcess for ParamTracker {
fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
ProcessResult::arrays(vec![Arc::new(array.clone())])
}
fn plugin_type(&self) -> &str {
"ParamTracker"
}
}
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"PARAM_TEST",
ParamTracker,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
send_array(handle.array_sender(), make_test_array(1));
let received = downstream_rx.blocking_recv().unwrap();
assert_eq!(received.unique_id, 1);
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
send_array(handle.array_sender(), make_test_array(2));
let received = downstream_rx.blocking_recv().unwrap();
assert_eq!(received.unique_id, 2);
}
#[test]
fn test_sort_buffer_reorders_by_unique_id() {
let mut buf = SortBuffer::new();
buf.insert(3, vec![make_test_array(3)], 10);
buf.insert(1, vec![make_test_array(1)], 10);
buf.insert(2, vec![make_test_array(2)], 10);
assert_eq!(buf.len(), 3);
let drained = buf.drain_all();
let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
assert_eq!(buf.len(), 0);
assert_eq!(buf.prev_unique_id, 3);
}
#[test]
fn test_sort_buffer_drain_ready_contiguous() {
let mut buf = SortBuffer::new();
buf.note_emitted(0);
buf.insert(1, vec![make_test_array(1)], 10);
buf.insert(2, vec![make_test_array(2)], 10);
buf.insert(5, vec![make_test_array(5)], 10);
let drained = buf.drain_ready(100.0);
let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
assert_eq!(ids, vec![1, 2], "contiguous run released; id=5 held by gap");
assert_eq!(buf.len(), 1);
}
#[test]
fn test_sort_buffer_drain_ready_deadline() {
let mut buf = SortBuffer::new();
buf.note_emitted(1); buf.insert(5, vec![make_test_array(5)], 10); std::thread::sleep(std::time::Duration::from_millis(30));
let drained = buf.drain_ready(0.01);
let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
assert_eq!(ids, vec![5], "stale head released via deadline");
}
#[test]
fn test_sort_buffer_detects_disordered_on_emit() {
let mut buf = SortBuffer::new();
buf.note_emitted(5); buf.note_emitted(3); assert_eq!(buf.disordered_arrays, 1);
buf.note_emitted(4); assert_eq!(buf.disordered_arrays, 1);
}
#[test]
fn test_sort_buffer_drops_when_full() {
let mut buf = SortBuffer::new();
assert!(buf.insert(1, vec![make_test_array(1)], 2));
assert!(buf.insert(2, vec![make_test_array(2)], 2));
assert!(!buf.insert(3, vec![make_test_array(3)], 2));
assert_eq!(buf.len(), 2);
assert_eq!(buf.dropped_output_arrays, 1);
}
#[test]
fn test_sort_mode_runtime_integration() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"SORT_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
.unwrap();
handle
.port_runtime()
.port_handle()
.write_float64_blocking(handle.plugin_params.sort_time, 0, 0.1)
.unwrap();
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
send_array(handle.array_sender(), make_test_array(1));
send_array(handle.array_sender(), make_test_array(2));
send_array(handle.array_sender(), make_test_array(3));
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let fast = rt.block_on(async {
tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
});
assert!(
fast.is_ok(),
"in-order arrays must be emitted immediately, not buffered"
);
assert_eq!(fast.unwrap().unwrap().unique_id, 1);
assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 2);
assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 3);
send_array(handle.array_sender(), make_test_array(5));
send_array(handle.array_sender(), make_test_array(4));
std::thread::sleep(std::time::Duration::from_millis(50));
assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 4);
assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 5);
}
#[test]
fn test_throttle_drops_output_arrays() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"THROTTLE_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
handle
.port_runtime()
.port_handle()
.write_float64_blocking(handle.plugin_params.max_byte_rate, 0, 8.0)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
for id in 1..=5 {
send_array(handle.array_sender(), make_test_array(id));
}
std::thread::sleep(std::time::Duration::from_millis(50));
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut received = 0;
while rt
.block_on(async {
tokio::time::timeout(std::time::Duration::from_millis(20), downstream_rx.recv())
.await
})
.map(|o| o.is_some())
.unwrap_or(false)
{
received += 1;
}
assert!(
received < 5,
"throttle must drop some arrays (got {received})"
);
assert!(received >= 1, "first array within budget must pass");
}
#[test]
fn test_process_plugin_reprocesses_last_input() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"PROCESS_PLUGIN_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
send_array(handle.array_sender(), make_test_array(7));
assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 7);
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
.unwrap();
let reprocessed = downstream_rx.blocking_recv().unwrap();
assert_eq!(
reprocessed.unique_id, 7,
"ProcessPlugin re-emits last input"
);
}
#[test]
fn test_min_callback_time_drop_counts() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"MIN_CB_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
handle
.port_runtime()
.port_handle()
.write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
send_array(handle.array_sender(), make_test_array(1));
send_array(handle.array_sender(), make_test_array(2));
std::thread::sleep(std::time::Duration::from_millis(50));
assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let second = rt.block_on(async {
tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
});
assert!(
second.is_err(),
"second array throttled out by MinCallbackTime"
);
}
#[test]
fn test_process_plugin_skips_throttled_input() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"PROCESS_THROTTLE_TEST",
PassthroughProcessor,
pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
handle
.port_runtime()
.port_handle()
.write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
send_array(handle.array_sender(), make_test_array(1));
send_array(handle.array_sender(), make_test_array(2));
std::thread::sleep(std::time::Duration::from_millis(50));
assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
handle
.port_runtime()
.port_handle()
.write_float64_blocking(handle.plugin_params.min_callback_time, 0, 0.0)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
handle
.port_runtime()
.port_handle()
.write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
.unwrap();
let reprocessed = downstream_rx.blocking_recv().unwrap();
assert_eq!(
reprocessed.unique_id, 1,
"ProcessPlugin must re-inject the last processed array (1), not the throttled array (2)"
);
}
#[test]
fn test_g3_compressed_array_dropped_on_non_aware_plugin() {
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
let mut output = NDArrayOutput::new();
output.add(downstream_sender);
let (handle, _data_jh) = create_plugin_runtime_with_output(
"G3_TEST",
PassthroughProcessor, pool,
10,
output,
"",
test_wiring(),
);
enable_callbacks(&handle);
let mut compressed = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
compressed.unique_id = 1;
compressed.codec = Some(crate::codec::Codec {
name: crate::codec::CodecName::JPEG,
compressed_size: 16,
level: 0,
shuffle: 0,
compressor: 0,
});
send_array(handle.array_sender(), Arc::new(compressed));
send_array(handle.array_sender(), make_test_array(2));
let r = downstream_rx.blocking_recv().unwrap();
assert_eq!(
r.unique_id, 2,
"compressed array dropped; only the raw array reaches downstream"
);
}
#[test]
fn test_drop_on_full_increments_dropped_counter() {
struct SlowProcessor;
impl NDPluginProcess for SlowProcessor {
fn process_array(&mut self, _a: &NDArray, _p: &NDArrayPool) -> ProcessResult {
std::thread::sleep(std::time::Duration::from_millis(200));
ProcessResult::empty()
}
fn plugin_type(&self) -> &str {
"Slow"
}
}
let pool = Arc::new(NDArrayPool::new(1_000_000));
let (downstream_handle, _ds_jh) =
create_plugin_runtime("B1_DOWNSTREAM", SlowProcessor, pool, 1, "", test_wiring());
enable_callbacks(&downstream_handle);
let ds_sender = downstream_handle.array_sender().clone();
let dropped = ds_sender.dropped_arrays_counter().clone();
send_array(&ds_sender, make_test_array(1));
send_array(&ds_sender, make_test_array(2));
send_array(&ds_sender, make_test_array(3));
send_array(&ds_sender, make_test_array(4));
assert!(
dropped.load(Ordering::Acquire) >= 1,
"arrays dropped on a full queue must be counted (got {})",
dropped.load(Ordering::Acquire)
);
}
#[test]
fn test_cross_width_narrowing_array_read_truncates() {
let mut out = [0i8; 1];
let n = copy_ccast(&[300u16], &mut out);
assert_eq!(n, 1);
assert_eq!(out[0], 44, "(epicsInt8)(epicsUInt16)300 == 44 (low 8 bits)");
let mut sat = [0i8; 1];
copy_convert(&[300u16], &mut sat);
assert_eq!(sat[0], 127, "f64 round-trip saturates — the wrong behavior");
let mut out2 = [0i8; 1];
copy_ccast(&[0x1234_5678i32], &mut out2);
assert_eq!(out2[0], 0x78);
let mut out3 = [0i8; 1];
copy_ccast(&[-1i32], &mut out3);
assert_eq!(out3[0], -1);
let mut out4 = [0i8; 1];
copy_ccast(&[255u16], &mut out4);
assert_eq!(out4[0], -1);
let mut out5 = [0i32; 1];
copy_ccast(&[0x0000_0001_0000_002Ai64], &mut out5);
assert_eq!(out5[0], 42);
let mut out6 = [0i16; 1];
copy_ccast(&[70000u32], &mut out6);
assert_eq!(out6[0], 4464);
let mut out7 = [0i8; 1];
copy_ccast(&[255u8], &mut out7);
assert_eq!(out7[0], -1);
let mut fout = [0i32; 1];
copy_convert(&[42.9f64], &mut fout);
assert_eq!(fout[0], 42, "f64 -> i32 truncates toward zero");
}
}