use std::sync::Arc;
use asyn_rs::error::AsynResult;
use asyn_rs::port::{PortDriverBase, PortFlags};
use crate::color::NDColorMode;
use crate::ndarray::NDArray;
use crate::ndarray_pool::NDArrayPool;
use crate::params::ad_driver::ADDriverParams;
use crate::plugin::channel::{NDArrayOutput, NDArraySender, QueuedArrayCounter};
use super::{ADStatus, ImageMode, ShutterMode};
pub struct ADDriverBase {
pub port_base: PortDriverBase,
pub params: ADDriverParams,
pub pool: Arc<NDArrayPool>,
pub array_output: NDArrayOutput,
pub queued_counter: Arc<QueuedArrayCounter>,
pub last_array: Option<Arc<NDArray>>,
}
impl ADDriverBase {
pub fn new(
port_name: &str,
max_size_x: i32,
max_size_y: i32,
max_memory: usize,
) -> AsynResult<Self> {
let mut port_base = PortDriverBase::new(
port_name,
1,
PortFlags {
can_block: true,
..Default::default()
},
);
let params = ADDriverParams::create(&mut port_base)?;
port_base.set_string_param(params.base.port_name_self, 0, port_name.into())?;
port_base.set_string_param(
params.base.ad_core_version,
0,
env!("CARGO_PKG_VERSION").into(),
)?;
port_base.set_string_param(
params.base.driver_version,
0,
env!("CARGO_PKG_VERSION").into(),
)?;
port_base.set_string_param(params.base.codec, 0, String::new())?;
port_base.set_int32_param(params.max_size_x, 0, max_size_x)?;
port_base.set_int32_param(params.max_size_y, 0, max_size_y)?;
port_base.set_int32_param(params.size_x, 0, max_size_x)?;
port_base.set_int32_param(params.size_y, 0, max_size_y)?;
port_base.set_int32_param(params.bin_x, 0, 1)?;
port_base.set_int32_param(params.bin_y, 0, 1)?;
port_base.set_int32_param(params.image_mode, 0, ImageMode::Single as i32)?;
port_base.set_int32_param(params.num_images, 0, 1)?;
port_base.set_int32_param(params.num_exposures, 0, 1)?;
port_base.set_float64_param(params.acquire_time, 0, 1.0)?;
port_base.set_float64_param(params.acquire_period, 0, 1.0)?;
port_base.set_int32_param(params.status, 0, ADStatus::Idle as i32)?;
port_base.set_string_param(params.status_message, 0, "Idle".into())?;
port_base.set_int32_param(params.base.data_type, 0, 1)?; port_base.set_int32_param(params.base.color_mode, 0, NDColorMode::Mono as i32)?;
port_base.set_int32_param(params.base.array_callbacks, 0, 1)?;
port_base.set_float64_param(
params.base.pool_max_memory,
0,
max_memory as f64 / 1_048_576.0,
)?;
port_base.set_int32_param(params.base.array_size_x, 0, 0)?;
port_base.set_int32_param(params.base.array_size_y, 0, 0)?;
port_base.set_int32_param(params.base.array_size_z, 0, 0)?;
port_base.set_int32_param(params.base.array_size, 0, 0)?;
port_base.set_float64_param(params.gain, 0, 1.0)?;
port_base.set_int32_param(params.shutter_mode, 0, ShutterMode::None as i32)?;
port_base.set_float64_param(params.temperature, 0, 25.0)?;
port_base.set_float64_param(params.temperature_actual, 0, 25.0)?;
let pool = Arc::new(NDArrayPool::new(max_memory));
Ok(Self {
port_base,
params,
pool,
array_output: NDArrayOutput::new(),
queued_counter: Arc::new(QueuedArrayCounter::new()),
last_array: None,
})
}
pub fn connect_downstream(&mut self, mut sender: NDArraySender) {
sender.set_queued_counter(self.queued_counter.clone());
self.array_output.add(sender);
}
pub fn prepare_array(&mut self, array: Arc<NDArray>) -> AsynResult<Option<Arc<NDArray>>> {
let counter = self
.port_base
.get_int32_param(self.params.base.array_counter, 0)?
+ 1;
self.port_base
.set_int32_param(self.params.base.array_counter, 0, counter)?;
crate::driver::ndarray_driver::write_array_params(
&mut self.port_base,
&self.params.base,
&array,
)?;
self.last_array = Some(array.clone());
self.port_base.set_float64_param(
self.params.base.pool_used_memory,
0,
self.pool.allocated_bytes() as f64 / 1_048_576.0,
)?;
self.port_base.set_int32_param(
self.params.base.pool_free_buffers,
0,
self.pool.num_free_buffers() as i32,
)?;
self.port_base.set_int32_param(
self.params.base.pool_alloc_buffers,
0,
self.pool.num_alloc_buffers() as i32,
)?;
let callbacks_enabled = self
.port_base
.get_int32_param(self.params.base.array_callbacks, 0)?
!= 0;
let to_publish = if callbacks_enabled {
self.port_base.set_generic_pointer_param(
self.params.base.ndarray_data,
0,
array.clone() as Arc<dyn std::any::Any + Send + Sync>,
)?;
Some(array)
} else {
None
};
self.port_base.call_param_callbacks(0)?;
Ok(to_publish)
}
pub fn set_shutter(&mut self, open: bool) -> AsynResult<()> {
let mode = ShutterMode::from_i32(
self.port_base
.get_int32_param(self.params.shutter_mode, 0)?,
);
match mode {
Some(ShutterMode::None) | None => {}
Some(ShutterMode::DetectorOnly) => {
}
Some(ShutterMode::EpicsOnly) => {
let open_delay = self
.port_base
.get_float64_param(self.params.shutter_open_delay, 0)?;
let close_delay = self
.port_base
.get_float64_param(self.params.shutter_close_delay, 0)?;
self.port_base.set_int32_param(
self.params.shutter_control_epics,
0,
if open { 1 } else { 0 },
)?;
self.port_base.call_param_callbacks(0)?;
let delay = open_delay - close_delay;
if delay > 0.0 {
std::thread::sleep(std::time::Duration::from_secs_f64(delay));
}
}
}
Ok(())
}
pub fn write_int32_pool(&mut self, param_index: usize, _value: i32) -> AsynResult<bool> {
let template = self.last_array.clone();
crate::driver::ndarray_driver::handle_pool_write_int32(
&mut self.port_base,
&self.params.base,
&self.pool,
param_index,
template.as_deref(),
)
}
pub fn set_acquire(&mut self, value: i32) -> AsynResult<()> {
if value == 0 {
let wait_for_plugins = self
.port_base
.get_int32_param(self.params.wait_for_plugins, 0)
.unwrap_or(0)
!= 0;
if !wait_for_plugins || self.queued_counter.get() == 0 {
self.port_base
.set_int32_param(self.params.acquire_busy, 0, 0)?;
}
} else {
self.port_base
.set_int32_param(self.params.acquire_busy, 0, 1)?;
}
self.port_base
.set_int32_param(self.params.acquire, 0, value)?;
Ok(())
}
pub fn set_num_queued_arrays(&mut self, value: i32) -> AsynResult<()> {
if value == 0 {
let acquire = self
.port_base
.get_int32_param(self.params.acquire, 0)
.unwrap_or(0);
if acquire == 0 {
self.port_base
.set_int32_param(self.params.acquire_busy, 0, 0)?;
}
}
self.port_base
.set_int32_param(self.params.base.num_queued_arrays, 0, value)?;
Ok(())
}
}
pub trait ADDriver: asyn_rs::port::PortDriver {
fn ad_base(&self) -> &ADDriverBase;
fn ad_base_mut(&mut self) -> &mut ADDriverBase;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_sets_initial_params() {
let ad = ADDriverBase::new("TEST", 1024, 768, 50_000_000).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.max_size_x, 0)
.unwrap(),
1024
);
assert_eq!(
ad.port_base
.get_int32_param(ad.params.max_size_y, 0)
.unwrap(),
768
);
assert_eq!(
ad.port_base.get_int32_param(ad.params.size_x, 0).unwrap(),
1024
);
assert_eq!(
ad.port_base.get_int32_param(ad.params.size_y, 0).unwrap(),
768
);
assert_eq!(
ad.port_base.get_int32_param(ad.params.status, 0).unwrap(),
ADStatus::Idle as i32
);
}
#[test]
fn test_prepare_array_increments_counter() {
let mut ad = ADDriverBase::new("TEST", 256, 256, 50_000_000).unwrap();
let arr = ad
.pool
.alloc(
vec![
crate::ndarray::NDDimension::new(256),
crate::ndarray::NDDimension::new(256),
],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
ad.prepare_array(Arc::new(arr)).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.base.array_counter, 0)
.unwrap(),
1
);
}
#[test]
fn test_prepare_array_skips_output_when_callbacks_disabled() {
use crate::plugin::channel::ndarray_channel;
let mut ad = ADDriverBase::new("TEST", 64, 64, 1_000_000).unwrap();
let (sender, _receiver) = ndarray_channel("DOWNSTREAM", 10);
ad.connect_downstream(sender);
ad.port_base
.set_int32_param(ad.params.base.array_callbacks, 0, 0)
.unwrap();
let arr = ad
.pool
.alloc(
vec![
crate::ndarray::NDDimension::new(64),
crate::ndarray::NDDimension::new(64),
],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
ad.prepare_array(Arc::new(arr)).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.base.array_counter, 0)
.unwrap(),
1
);
let gp = ad
.port_base
.get_generic_pointer_param(ad.params.base.ndarray_data, 0)
.unwrap();
assert!(gp.downcast_ref::<NDArray>().is_none());
}
#[test]
fn test_publish_sets_generic_pointer() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
let arr = ad
.pool
.alloc(
vec![
crate::ndarray::NDDimension::new(8),
crate::ndarray::NDDimension::new(8),
],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
let id = arr.unique_id;
ad.prepare_array(Arc::new(arr)).unwrap();
let gp = ad
.port_base
.get_generic_pointer_param(ad.params.base.ndarray_data, 0)
.unwrap();
let recovered = gp.downcast_ref::<NDArray>().unwrap();
assert_eq!(recovered.unique_id, id);
}
#[test]
fn test_shutter_detector_mode_is_noop_in_base() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
ad.port_base
.set_int32_param(ad.params.shutter_mode, 0, ShutterMode::DetectorOnly as i32)
.unwrap();
ad.port_base
.set_int32_param(ad.params.shutter_control, 0, 7)
.unwrap();
ad.set_shutter(true).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.shutter_control, 0)
.unwrap(),
7
);
}
#[test]
fn test_shutter_control_epics_mode() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
ad.port_base
.set_int32_param(ad.params.shutter_mode, 0, ShutterMode::EpicsOnly as i32)
.unwrap();
ad.set_shutter(true).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.shutter_control_epics, 0)
.unwrap(),
1
);
assert_eq!(
ad.port_base
.get_int32_param_strict(ad.params.shutter_status, 0)
.ok(),
None,
"SHUTTER_STATUS must remain unset by setShutter"
);
}
#[test]
fn test_shutter_epics_mode_sleeps_open_minus_close() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
ad.port_base
.set_int32_param(ad.params.shutter_mode, 0, ShutterMode::EpicsOnly as i32)
.unwrap();
ad.port_base
.set_float64_param(ad.params.shutter_open_delay, 0, 0.05)
.unwrap();
ad.port_base
.set_float64_param(ad.params.shutter_close_delay, 0, 0.01)
.unwrap();
let start = std::time::Instant::now();
ad.set_shutter(true).unwrap();
let elapsed = start.elapsed();
assert!(
elapsed >= std::time::Duration::from_millis(35),
"expected ~40ms sleep, got {elapsed:?}"
);
}
#[test]
fn test_set_acquire_drives_acquire_busy() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
ad.set_acquire(1).unwrap();
assert_eq!(
ad.port_base.get_int32_param(ad.params.acquire, 0).unwrap(),
1
);
assert_eq!(
ad.port_base
.get_int32_param(ad.params.acquire_busy, 0)
.unwrap(),
1
);
ad.set_acquire(0).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.acquire_busy, 0)
.unwrap(),
0
);
}
#[test]
fn test_set_acquire_wait_for_plugins_gating() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
ad.port_base
.set_int32_param(ad.params.wait_for_plugins, 0, 1)
.unwrap();
ad.set_acquire(1).unwrap();
ad.queued_counter.increment();
ad.set_acquire(0).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.acquire_busy, 0)
.unwrap(),
1
);
ad.queued_counter.decrement();
ad.set_num_queued_arrays(0).unwrap();
assert_eq!(
ad.port_base
.get_int32_param(ad.params.acquire_busy, 0)
.unwrap(),
0
);
}
#[test]
fn test_write_int32_pool_empty_free_list() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 10_000_000).unwrap();
let arr = ad
.pool
.alloc(
vec![crate::ndarray::NDDimension::new(100)],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
ad.pool.release(arr);
assert_eq!(ad.pool.num_free_buffers(), 1);
let handled = ad
.write_int32_pool(ad.params.base.pool_empty_free_list, 1)
.unwrap();
assert!(handled);
assert_eq!(ad.pool.num_free_buffers(), 0);
}
#[test]
fn test_write_int32_pool_pre_alloc() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 10_000_000).unwrap();
let arr = ad
.pool
.alloc(
vec![crate::ndarray::NDDimension::new(64)],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
ad.prepare_array(Arc::new(arr)).unwrap();
ad.port_base
.set_int32_param(ad.params.base.pool_num_pre_alloc_buffers, 0, 3)
.unwrap();
let handled = ad
.write_int32_pool(ad.params.base.pool_pre_alloc, 1)
.unwrap();
assert!(handled);
assert_eq!(ad.pool.num_free_buffers(), 3);
assert_eq!(
ad.port_base
.get_int32_param(ad.params.base.pool_pre_alloc, 0)
.unwrap(),
0
);
}
#[test]
fn test_write_int32_pool_unrecognized() {
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
let handled = ad.write_int32_pool(ad.params.gain, 1).unwrap();
assert!(!handled);
}
#[test]
fn test_gain_and_temperature() {
let ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
assert_eq!(
ad.port_base.get_float64_param(ad.params.gain, 0).unwrap(),
1.0
);
assert_eq!(
ad.port_base
.get_float64_param(ad.params.temperature, 0)
.unwrap(),
25.0
);
}
#[test]
fn test_connect_downstream() {
use crate::plugin::channel::ndarray_channel;
let mut ad = ADDriverBase::new("TEST", 8, 8, 1_000_000).unwrap();
let (sender, mut receiver) = ndarray_channel("DOWNSTREAM", 10);
ad.connect_downstream(sender);
let arr = ad
.pool
.alloc(
vec![
crate::ndarray::NDDimension::new(8),
crate::ndarray::NDDimension::new(8),
],
crate::ndarray::NDDataType::UInt8,
)
.unwrap();
let id = arr.unique_id;
let to_publish = ad.prepare_array(Arc::new(arr)).unwrap().unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let _ = rt.block_on(ad.array_output.publish(to_publish));
let received = receiver.blocking_recv().unwrap();
assert_eq!(received.unique_id, id);
}
}