Skip to main content

ad_plugins_rs/
roi_stat.rs

1//! NDPluginROIStat: computes basic statistics for multiple ROI regions on each array.
2//!
3//! Each ROI is a rectangular sub-region of a 2D image. For each enabled ROI,
4//! the plugin computes min, max, mean, total, and net (background-subtracted total).
5//! Optionally accumulates time series data in circular buffers.
6
7use std::sync::Arc;
8
9use ad_core_rs::ndarray::{NDArray, NDDataBuffer};
10use ad_core_rs::ndarray_pool::NDArrayPool;
11use ad_core_rs::plugin::runtime::{
12    NDPluginProcess, ParamUpdate, PluginParamSnapshot, PluginRuntimeHandle, ProcessResult,
13};
14use ad_core_rs::plugin::wiring::WiringRegistry;
15use asyn_rs::param::ParamType;
16use asyn_rs::port::PortDriverBase;
17use parking_lot::Mutex;
18
19#[cfg(feature = "parallel")]
20use crate::par_util;
21use crate::time_series::{TimeSeriesData, TimeSeriesSender};
22#[cfg(feature = "parallel")]
23use rayon::prelude::*;
24
25/// Configuration for a single ROI region.
26#[derive(Debug, Clone)]
27pub struct ROIStatROI {
28    pub enabled: bool,
29    /// Offset in pixels: [x, y].
30    pub offset: [usize; 2],
31    /// Size in pixels: [x, y].
32    pub size: [usize; 2],
33    /// Width of the background border (pixels). 0 = no background subtraction.
34    pub bgd_width: usize,
35}
36
37impl Default for ROIStatROI {
38    fn default() -> Self {
39        Self {
40            enabled: true,
41            offset: [0, 0],
42            size: [0, 0],
43            bgd_width: 0,
44        }
45    }
46}
47
48/// Statistics computed for a single ROI.
49#[derive(Debug, Clone, Default)]
50pub struct ROIStatResult {
51    pub min: f64,
52    pub max: f64,
53    pub mean: f64,
54    pub total: f64,
55    /// Net = total - background_average * roi_elements. Zero if bgd_width is 0.
56    pub net: f64,
57}
58
59/// Time-series acquisition mode.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum TSMode {
62    Idle,
63    Acquiring,
64}
65
66/// Number of statistics tracked per ROI (min, max, mean, total, net).
67const NUM_STATS: usize = 5;
68
69/// Per-ROI stat names used for time series channel naming.
70const ROI_STAT_NAMES: [&str; NUM_STATS] = ["MinValue", "MaxValue", "MeanValue", "Total", "Net"];
71
72/// Generate time series channel names for ROIStat with the given number of ROIs.
73/// Produces names like "TS1:MinValue", "TS1:MaxValue", ..., "TS2:MinValue", etc.
74pub fn roi_stat_ts_channel_names(num_rois: usize) -> Vec<String> {
75    let mut names = Vec::with_capacity(num_rois * NUM_STATS);
76    for roi_idx in 0..num_rois {
77        for stat_name in &ROI_STAT_NAMES {
78            names.push(format!("TS{}:{}", roi_idx + 1, stat_name));
79        }
80    }
81    names
82}
83
84/// Parameter indices for NDROIStat plugin-specific params.
85///
86/// Per-ROI params use a single index and are differentiated by asyn addr (0..N).
87#[derive(Clone, Copy, Default)]
88pub struct ROIStatParams {
89    // Global (addr 0)
90    pub reset_all: usize,
91    pub ts_control: usize,
92    pub ts_num_points: usize,
93    pub ts_current_point: usize,
94    pub ts_acquiring: usize,
95    // Per-ROI (same index, different addr)
96    pub use_: usize,
97    pub name: usize,
98    pub reset: usize,
99    pub bgd_width: usize,
100    pub dim0_min: usize,
101    pub dim1_min: usize,
102    pub dim0_size: usize,
103    pub dim1_size: usize,
104    pub dim0_max_size: usize,
105    pub dim1_max_size: usize,
106    pub min_value: usize,
107    pub max_value: usize,
108    pub mean_value: usize,
109    pub total: usize,
110    pub net: usize,
111    // Time series waveform arrays (per-ROI, differentiated by addr)
112    pub ts_total: usize,
113    pub ts_net: usize,
114    pub ts_mean_value: usize,
115    pub ts_min_value: usize,
116    pub ts_max_value: usize,
117    pub ts_timestamp: usize,
118}
119
120/// Processor that computes ROI statistics on 2D arrays.
121pub struct ROIStatProcessor {
122    rois: Vec<ROIStatROI>,
123    results: Vec<ROIStatResult>,
124    /// Time series buffers: [roi_index][stat_index][time_point].
125    ts_mode: TSMode,
126    ts_buffers: Vec<Vec<Vec<f64>>>,
127    ts_num_points: usize,
128    ts_current: usize,
129    /// Optional sender to push flattened stats to a TimeSeriesPortDriver.
130    ts_sender: Option<TimeSeriesSender>,
131    /// Registered asyn param indices.
132    params: ROIStatParams,
133    /// Shared cell to export params after register_params is called.
134    params_out: Arc<Mutex<ROIStatParams>>,
135}
136
137impl ROIStatProcessor {
138    /// Create a new processor with the given ROI definitions.
139    pub fn new(rois: Vec<ROIStatROI>, ts_num_points: usize) -> Self {
140        let n = rois.len();
141        let results = vec![ROIStatResult::default(); n];
142        let ts_buffers = vec![vec![Vec::new(); NUM_STATS]; n];
143        Self {
144            rois,
145            results,
146            ts_mode: TSMode::Idle,
147            ts_buffers,
148            ts_num_points,
149            ts_current: 0,
150            ts_sender: None,
151            params: ROIStatParams::default(),
152            params_out: Arc::new(Mutex::new(ROIStatParams::default())),
153        }
154    }
155
156    /// Get a shared handle to the params (populated after register_params is called).
157    pub fn params_handle(&self) -> Arc<Mutex<ROIStatParams>> {
158        self.params_out.clone()
159    }
160
161    /// Get the current results for all ROIs.
162    pub fn results(&self) -> &[ROIStatResult] {
163        &self.results
164    }
165
166    /// Get the ROI definitions.
167    pub fn rois(&self) -> &[ROIStatROI] {
168        &self.rois
169    }
170
171    /// Mutable access to ROI definitions.
172    pub fn rois_mut(&mut self) -> &mut Vec<ROIStatROI> {
173        &mut self.rois
174    }
175
176    /// Set the time series mode.
177    pub fn set_ts_mode(&mut self, mode: TSMode) {
178        if mode == TSMode::Acquiring && self.ts_mode != TSMode::Acquiring {
179            // Reset time series on start
180            for roi_bufs in &mut self.ts_buffers {
181                for stat_buf in roi_bufs.iter_mut() {
182                    stat_buf.clear();
183                }
184            }
185            self.ts_current = 0;
186        }
187        self.ts_mode = mode;
188    }
189
190    /// Get time series buffer for a specific ROI and stat index.
191    /// stat_index: 0=min, 1=max, 2=mean, 3=total, 4=net
192    pub fn ts_buffer(&self, roi_index: usize, stat_index: usize) -> &[f64] {
193        if roi_index < self.ts_buffers.len() && stat_index < NUM_STATS {
194            &self.ts_buffers[roi_index][stat_index]
195        } else {
196            &[]
197        }
198    }
199
200    /// Set the sender for pushing time series data to a TimeSeriesPortDriver.
201    pub fn set_ts_sender(&mut self, sender: TimeSeriesSender) {
202        self.ts_sender = Some(sender);
203    }
204
205    /// Compute statistics for a single ROI on a 2D data buffer.
206    pub fn compute_roi_stats(
207        data: &NDDataBuffer,
208        x_size: usize,
209        y_size: usize,
210        roi: &ROIStatROI,
211    ) -> ROIStatResult {
212        let roi_x = roi.offset[0];
213        let roi_y = roi.offset[1];
214        let roi_w = roi.size[0];
215        let roi_h = roi.size[1];
216
217        // Clamp ROI to image bounds
218        if roi_x >= x_size || roi_y >= y_size || roi_w == 0 || roi_h == 0 {
219            return ROIStatResult::default();
220        }
221        let roi_w = roi_w.min(x_size - roi_x);
222        let roi_h = roi_h.min(y_size - roi_y);
223
224        let mut min = f64::MAX;
225        let mut max = f64::MIN;
226        let mut total = 0.0f64;
227        let mut count = 0usize;
228
229        for iy in roi_y..(roi_y + roi_h) {
230            for ix in roi_x..(roi_x + roi_w) {
231                let idx = iy * x_size + ix;
232                if let Some(val) = data.get_as_f64(idx) {
233                    if val < min {
234                        min = val;
235                    }
236                    if val > max {
237                        max = val;
238                    }
239                    total += val;
240                    count += 1;
241                }
242            }
243        }
244
245        if count == 0 {
246            return ROIStatResult::default();
247        }
248
249        let mean = total / count as f64;
250
251        // Background subtraction
252        let net = if roi.bgd_width > 0 {
253            let bgd = Self::compute_background(data, x_size, y_size, roi);
254            total - bgd * count as f64
255        } else {
256            total
257        };
258
259        ROIStatResult {
260            min,
261            max,
262            mean,
263            total,
264            net,
265        }
266    }
267
268    /// Compute average background from the border of the ROI.
269    fn compute_background(
270        data: &NDDataBuffer,
271        x_size: usize,
272        y_size: usize,
273        roi: &ROIStatROI,
274    ) -> f64 {
275        let roi_x = roi.offset[0];
276        let roi_y = roi.offset[1];
277        let roi_w = roi.size[0].min(x_size.saturating_sub(roi_x));
278        let roi_h = roi.size[1].min(y_size.saturating_sub(roi_y));
279        let bw = roi.bgd_width;
280
281        if bw == 0 || roi_w == 0 || roi_h == 0 {
282            return 0.0;
283        }
284
285        let mut bgd_total = 0.0f64;
286        let mut bgd_count = 0usize;
287
288        for iy in roi_y..(roi_y + roi_h) {
289            for ix in roi_x..(roi_x + roi_w) {
290                // Check if this pixel is in the border region
291                let dx_from_left = ix - roi_x;
292                let dx_from_right = (roi_x + roi_w - 1) - ix;
293                let dy_from_top = iy - roi_y;
294                let dy_from_bottom = (roi_y + roi_h - 1) - iy;
295
296                let in_border = dx_from_left < bw
297                    || dx_from_right < bw
298                    || dy_from_top < bw
299                    || dy_from_bottom < bw;
300
301                if in_border {
302                    let idx = iy * x_size + ix;
303                    if let Some(val) = data.get_as_f64(idx) {
304                        bgd_total += val;
305                        bgd_count += 1;
306                    }
307                }
308            }
309        }
310
311        if bgd_count == 0 {
312            0.0
313        } else {
314            bgd_total / bgd_count as f64
315        }
316    }
317}
318
319impl NDPluginProcess for ROIStatProcessor {
320    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
321        let info = array.info();
322        let x_size = info.x_size;
323        let y_size = info.y_size;
324
325        // Ensure results vec matches rois
326        self.results
327            .resize(self.rois.len(), ROIStatResult::default());
328
329        #[cfg(feature = "parallel")]
330        {
331            let total_elements: usize = self
332                .rois
333                .iter()
334                .filter(|r| r.enabled)
335                .map(|r| r.size[0] * r.size[1])
336                .sum();
337
338            if par_util::should_parallelize(total_elements) {
339                let data = &array.data;
340                let rois = &self.rois;
341                let new_results: Vec<ROIStatResult> = par_util::thread_pool().install(|| {
342                    rois.par_iter()
343                        .map(|roi| {
344                            if roi.enabled {
345                                Self::compute_roi_stats(data, x_size, y_size, roi)
346                            } else {
347                                ROIStatResult::default()
348                            }
349                        })
350                        .collect()
351                });
352                self.results = new_results;
353            } else {
354                for (i, roi) in self.rois.iter().enumerate() {
355                    if !roi.enabled {
356                        self.results[i] = ROIStatResult::default();
357                        continue;
358                    }
359                    self.results[i] = Self::compute_roi_stats(&array.data, x_size, y_size, roi);
360                }
361            }
362        }
363
364        #[cfg(not(feature = "parallel"))]
365        for (i, roi) in self.rois.iter().enumerate() {
366            if !roi.enabled {
367                self.results[i] = ROIStatResult::default();
368                continue;
369            }
370            self.results[i] = Self::compute_roi_stats(&array.data, x_size, y_size, roi);
371        }
372
373        // Accumulate time series (fixed-length: stop when full)
374        if self.ts_mode == TSMode::Acquiring {
375            if self.ts_num_points > 0 && self.ts_current >= self.ts_num_points {
376                // Buffer full — stop acquiring
377                self.ts_mode = TSMode::Idle;
378            } else {
379                // Ensure ts_buffers match roi count
380                while self.ts_buffers.len() < self.rois.len() {
381                    self.ts_buffers.push(vec![Vec::new(); NUM_STATS]);
382                }
383
384                for (i, result) in self.results.iter().enumerate() {
385                    if i >= self.ts_buffers.len() {
386                        break;
387                    }
388                    let stats = [
389                        result.min,
390                        result.max,
391                        result.mean,
392                        result.total,
393                        result.net,
394                    ];
395                    for (s, &val) in stats.iter().enumerate() {
396                        let buf = &mut self.ts_buffers[i][s];
397                        buf.push(val);
398                    }
399                }
400                self.ts_current += 1;
401            }
402        }
403
404        // Send flattened stats to TimeSeriesPortDriver if connected
405        if let Some(ref sender) = self.ts_sender {
406            let mut values = Vec::with_capacity(self.results.len() * NUM_STATS);
407            for result in &self.results {
408                values.push(result.min);
409                values.push(result.max);
410                values.push(result.mean);
411                values.push(result.total);
412                values.push(result.net);
413            }
414            let _ = sender.try_send(TimeSeriesData { values });
415        }
416
417        // Build per-ROI param updates (only for enabled ROIs)
418        let p = &self.params;
419        let mut updates = Vec::new();
420        for (i, roi) in self.rois.iter().enumerate() {
421            if !roi.enabled {
422                continue;
423            }
424            let result = &self.results[i];
425            let addr = i as i32;
426            updates.push(ParamUpdate::float64_addr(p.min_value, addr, result.min));
427            updates.push(ParamUpdate::float64_addr(p.max_value, addr, result.max));
428            updates.push(ParamUpdate::float64_addr(p.mean_value, addr, result.mean));
429            updates.push(ParamUpdate::float64_addr(p.total, addr, result.total));
430            updates.push(ParamUpdate::float64_addr(p.net, addr, result.net));
431            updates.push(ParamUpdate::int32_addr(
432                p.dim0_max_size,
433                addr,
434                x_size as i32,
435            ));
436            updates.push(ParamUpdate::int32_addr(
437                p.dim1_max_size,
438                addr,
439                y_size as i32,
440            ));
441        }
442        updates.push(ParamUpdate::int32(
443            p.ts_current_point,
444            self.ts_current as i32,
445        ));
446        updates.push(ParamUpdate::int32(
447            p.ts_acquiring,
448            if self.ts_mode == TSMode::Acquiring {
449                1
450            } else {
451                0
452            },
453        ));
454
455        // Write time series buffers to params for waveform readback
456        for (i, roi) in self.rois.iter().enumerate() {
457            if !roi.enabled || i >= self.ts_buffers.len() {
458                continue;
459            }
460            let addr = i as i32;
461            let bufs = &self.ts_buffers[i];
462            // stat order: min=0, max=1, mean=2, total=3, net=4
463            if !bufs.is_empty() {
464                updates.push(ParamUpdate::float64_array_addr(
465                    p.ts_min_value,
466                    addr,
467                    bufs[0].clone(),
468                ));
469            }
470            if bufs.len() > 1 {
471                updates.push(ParamUpdate::float64_array_addr(
472                    p.ts_max_value,
473                    addr,
474                    bufs[1].clone(),
475                ));
476            }
477            if bufs.len() > 2 {
478                updates.push(ParamUpdate::float64_array_addr(
479                    p.ts_mean_value,
480                    addr,
481                    bufs[2].clone(),
482                ));
483            }
484            if bufs.len() > 3 {
485                updates.push(ParamUpdate::float64_array_addr(
486                    p.ts_total,
487                    addr,
488                    bufs[3].clone(),
489                ));
490            }
491            if bufs.len() > 4 {
492                updates.push(ParamUpdate::float64_array_addr(
493                    p.ts_net,
494                    addr,
495                    bufs[4].clone(),
496                ));
497            }
498        }
499
500        ProcessResult::sink(updates)
501    }
502
503    fn plugin_type(&self) -> &str {
504        "NDPluginROIStat"
505    }
506
507    fn register_params(
508        &mut self,
509        base: &mut PortDriverBase,
510    ) -> Result<(), asyn_rs::error::AsynError> {
511        // Global params
512        self.params.reset_all = base.create_param("ROISTAT_RESETALL", ParamType::Int32)?;
513        self.params.ts_control = base.create_param("ROISTAT_TS_CONTROL", ParamType::Int32)?;
514        self.params.ts_num_points = base.create_param("ROISTAT_TS_NUM_POINTS", ParamType::Int32)?;
515        base.set_int32_param(self.params.ts_num_points, 0, self.ts_num_points as i32)?;
516        self.params.ts_current_point =
517            base.create_param("ROISTAT_TS_CURRENT_POINT", ParamType::Int32)?;
518        self.params.ts_acquiring = base.create_param("ROISTAT_TS_ACQUIRING", ParamType::Int32)?;
519
520        // Per-ROI params (single index, differentiated by addr)
521        self.params.use_ = base.create_param("ROISTAT_USE", ParamType::Int32)?;
522        self.params.name = base.create_param("ROISTAT_NAME", ParamType::Octet)?;
523        self.params.reset = base.create_param("ROISTAT_RESET", ParamType::Int32)?;
524        self.params.bgd_width = base.create_param("ROISTAT_BGD_WIDTH", ParamType::Int32)?;
525        self.params.dim0_min = base.create_param("ROISTAT_DIM0_MIN", ParamType::Int32)?;
526        self.params.dim1_min = base.create_param("ROISTAT_DIM1_MIN", ParamType::Int32)?;
527        self.params.dim0_size = base.create_param("ROISTAT_DIM0_SIZE", ParamType::Int32)?;
528        self.params.dim1_size = base.create_param("ROISTAT_DIM1_SIZE", ParamType::Int32)?;
529        self.params.dim0_max_size = base.create_param("ROISTAT_DIM0_MAX_SIZE", ParamType::Int32)?;
530        self.params.dim1_max_size = base.create_param("ROISTAT_DIM1_MAX_SIZE", ParamType::Int32)?;
531        self.params.min_value = base.create_param("ROISTAT_MIN_VALUE", ParamType::Float64)?;
532        self.params.max_value = base.create_param("ROISTAT_MAX_VALUE", ParamType::Float64)?;
533        self.params.mean_value = base.create_param("ROISTAT_MEAN_VALUE", ParamType::Float64)?;
534        self.params.total = base.create_param("ROISTAT_TOTAL", ParamType::Float64)?;
535        self.params.net = base.create_param("ROISTAT_NET", ParamType::Float64)?;
536
537        // Time series waveform arrays (per-ROI)
538        self.params.ts_total = base.create_param("ROISTAT_TS_TOTAL", ParamType::Float64Array)?;
539        self.params.ts_net = base.create_param("ROISTAT_TS_NET", ParamType::Float64Array)?;
540        self.params.ts_mean_value =
541            base.create_param("ROISTAT_TS_MEAN_VALUE", ParamType::Float64Array)?;
542        self.params.ts_min_value =
543            base.create_param("ROISTAT_TS_MIN_VALUE", ParamType::Float64Array)?;
544        self.params.ts_max_value =
545            base.create_param("ROISTAT_TS_MAX_VALUE", ParamType::Float64Array)?;
546        self.params.ts_timestamp =
547            base.create_param("ROISTAT_TS_TIMESTAMP", ParamType::Float64Array)?;
548
549        // Set initial per-ROI values
550        for (i, roi) in self.rois.iter().enumerate() {
551            let addr = i as i32;
552            base.set_int32_param(self.params.use_, addr, roi.enabled as i32)?;
553            base.set_int32_param(self.params.bgd_width, addr, roi.bgd_width as i32)?;
554            base.set_int32_param(self.params.dim0_min, addr, roi.offset[0] as i32)?;
555            base.set_int32_param(self.params.dim1_min, addr, roi.offset[1] as i32)?;
556            base.set_int32_param(self.params.dim0_size, addr, roi.size[0] as i32)?;
557            base.set_int32_param(self.params.dim1_size, addr, roi.size[1] as i32)?;
558        }
559
560        // Export params
561        *self.params_out.lock() = self.params;
562
563        Ok(())
564    }
565
566    fn on_param_change(
567        &mut self,
568        reason: usize,
569        snapshot: &PluginParamSnapshot,
570    ) -> ad_core_rs::plugin::runtime::ParamChangeResult {
571        let addr = snapshot.addr as usize;
572        let p = &self.params;
573
574        if reason == p.use_ && addr < self.rois.len() {
575            self.rois[addr].enabled = snapshot.value.as_i32() != 0;
576        } else if reason == p.dim0_min && addr < self.rois.len() {
577            self.rois[addr].offset[0] = snapshot.value.as_i32().max(0) as usize;
578        } else if reason == p.dim1_min && addr < self.rois.len() {
579            self.rois[addr].offset[1] = snapshot.value.as_i32().max(0) as usize;
580        } else if reason == p.dim0_size && addr < self.rois.len() {
581            self.rois[addr].size[0] = snapshot.value.as_i32().max(0) as usize;
582        } else if reason == p.dim1_size && addr < self.rois.len() {
583            self.rois[addr].size[1] = snapshot.value.as_i32().max(0) as usize;
584        } else if reason == p.bgd_width && addr < self.rois.len() {
585            self.rois[addr].bgd_width = snapshot.value.as_i32().max(0) as usize;
586        } else if reason == p.reset && addr < self.rois.len() {
587            self.results[addr] = ROIStatResult::default();
588        } else if reason == p.reset_all {
589            for r in &mut self.results {
590                *r = ROIStatResult::default();
591            }
592        } else if reason == p.ts_control {
593            // 0=EraseStart (clear+start), 1=Start (resume), 2=Stop, 3=Read, 4=Erase
594            match snapshot.value.as_i32() {
595                0 => {
596                    // EraseStart: clear buffers then start
597                    for roi_bufs in &mut self.ts_buffers {
598                        for stat_buf in roi_bufs.iter_mut() {
599                            stat_buf.clear();
600                        }
601                    }
602                    self.ts_current = 0;
603                    self.ts_mode = TSMode::Acquiring;
604                }
605                1 => {
606                    // Start: resume without clearing
607                    self.ts_mode = TSMode::Acquiring;
608                }
609                2 => {
610                    // Stop
611                    self.ts_mode = TSMode::Idle;
612                }
613                3 => {
614                    // Read: callback without stopping (no-op here, param update triggers read)
615                }
616                4 => {
617                    // Erase: clear buffers
618                    for roi_bufs in &mut self.ts_buffers {
619                        for stat_buf in roi_bufs.iter_mut() {
620                            stat_buf.clear();
621                        }
622                    }
623                    self.ts_current = 0;
624                }
625                _ => {}
626            }
627        } else if reason == p.ts_num_points {
628            self.ts_num_points = snapshot.value.as_i32().max(0) as usize;
629        }
630        ad_core_rs::plugin::runtime::ParamChangeResult::empty()
631    }
632}
633
634/// Create a ROIStat plugin runtime. The TS receiver is stored in the registry
635/// for later pickup by `NDTimeSeriesConfigure`.
636pub fn create_roi_stat_runtime(
637    port_name: &str,
638    pool: Arc<NDArrayPool>,
639    queue_size: usize,
640    ndarray_port: &str,
641    wiring: Arc<WiringRegistry>,
642    num_rois: usize,
643    ts_registry: &crate::time_series::TsReceiverRegistry,
644) -> (
645    PluginRuntimeHandle,
646    ROIStatParams,
647    std::thread::JoinHandle<()>,
648) {
649    let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);
650
651    let rois: Vec<ROIStatROI> = (0..num_rois).map(|_| ROIStatROI::default()).collect();
652    let mut processor = ROIStatProcessor::new(rois, 2048);
653    processor.set_ts_sender(ts_tx);
654    let params_handle = processor.params_handle();
655
656    let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime_multi_addr(
657        port_name,
658        processor,
659        pool,
660        queue_size,
661        ndarray_port,
662        wiring,
663        num_rois,
664    );
665
666    let roi_stat_params = *params_handle.lock();
667
668    // Store the TS receiver for NDTimeSeriesConfigure to pick up
669    let channel_names = roi_stat_ts_channel_names(num_rois);
670    ts_registry.store(port_name, ts_rx, channel_names);
671
672    (handle, roi_stat_params, data_jh)
673}
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678    use ad_core_rs::ndarray::{NDDataType, NDDimension};
679
680    fn make_2d_array(x: usize, y: usize, fill: impl Fn(usize, usize) -> f64) -> NDArray {
681        let mut arr = NDArray::new(
682            vec![NDDimension::new(x), NDDimension::new(y)],
683            NDDataType::Float64,
684        );
685        if let NDDataBuffer::F64(ref mut v) = arr.data {
686            for iy in 0..y {
687                for ix in 0..x {
688                    v[iy * x + ix] = fill(ix, iy);
689                }
690            }
691        }
692        arr
693    }
694
695    #[test]
696    fn test_single_roi_full_image() {
697        let arr = make_2d_array(4, 4, |_x, _y| 10.0);
698        let rois = vec![ROIStatROI {
699            enabled: true,
700            offset: [0, 0],
701            size: [4, 4],
702            bgd_width: 0,
703        }];
704
705        let mut proc = ROIStatProcessor::new(rois, 0);
706        let pool = NDArrayPool::new(1_000_000);
707        proc.process_array(&arr, &pool);
708
709        let r = &proc.results()[0];
710        assert!((r.min - 10.0).abs() < 1e-10);
711        assert!((r.max - 10.0).abs() < 1e-10);
712        assert!((r.mean - 10.0).abs() < 1e-10);
713        assert!((r.total - 160.0).abs() < 1e-10);
714    }
715
716    #[test]
717    fn test_single_roi_subregion() {
718        // 8x8 image, values = x + y * 8
719        let arr = make_2d_array(8, 8, |x, y| (x + y * 8) as f64);
720
721        let rois = vec![ROIStatROI {
722            enabled: true,
723            offset: [2, 2],
724            size: [3, 3],
725            bgd_width: 0,
726        }];
727
728        let mut proc = ROIStatProcessor::new(rois, 0);
729        let pool = NDArrayPool::new(1_000_000);
730        proc.process_array(&arr, &pool);
731
732        let r = &proc.results()[0];
733        // ROI pixels: (2,2)=18, (3,2)=19, (4,2)=20, (2,3)=26, (3,3)=27, (4,3)=28, (2,4)=34, (3,4)=35, (4,4)=36
734        assert!((r.min - 18.0).abs() < 1e-10);
735        assert!((r.max - 36.0).abs() < 1e-10);
736        let expected_total = 18.0 + 19.0 + 20.0 + 26.0 + 27.0 + 28.0 + 34.0 + 35.0 + 36.0;
737        assert!((r.total - expected_total).abs() < 1e-10);
738        assert!((r.mean - expected_total / 9.0).abs() < 1e-10);
739    }
740
741    #[test]
742    fn test_multiple_rois() {
743        let arr = make_2d_array(8, 8, |x, _y| x as f64);
744
745        let rois = vec![
746            ROIStatROI {
747                enabled: true,
748                offset: [0, 0],
749                size: [4, 4],
750                bgd_width: 0,
751            },
752            ROIStatROI {
753                enabled: true,
754                offset: [4, 0],
755                size: [4, 4],
756                bgd_width: 0,
757            },
758        ];
759
760        let mut proc = ROIStatProcessor::new(rois, 0);
761        let pool = NDArrayPool::new(1_000_000);
762        proc.process_array(&arr, &pool);
763
764        let r0 = &proc.results()[0];
765        assert!((r0.min - 0.0).abs() < 1e-10);
766        assert!((r0.max - 3.0).abs() < 1e-10);
767
768        let r1 = &proc.results()[1];
769        assert!((r1.min - 4.0).abs() < 1e-10);
770        assert!((r1.max - 7.0).abs() < 1e-10);
771    }
772
773    #[test]
774    fn test_bgd_width() {
775        // 6x6 image, center 2x2 has value 100, border has value 10
776        let arr = make_2d_array(6, 6, |x, y| {
777            if x >= 2 && x < 4 && y >= 2 && y < 4 {
778                100.0
779            } else {
780                10.0
781            }
782        });
783
784        let rois = vec![ROIStatROI {
785            enabled: true,
786            offset: [1, 1],
787            size: [4, 4],
788            bgd_width: 1,
789        }];
790
791        let mut proc = ROIStatProcessor::new(rois, 0);
792        let pool = NDArrayPool::new(1_000_000);
793        proc.process_array(&arr, &pool);
794
795        let r = &proc.results()[0];
796        // ROI is 4x4 at (1,1): border pixels = 12 (all with value 10), center = 4 (value 100)
797        // bgd average = (12*10 + ... well, border includes some 100s)
798        // Actually border pixels at bgd_width=1: the outer ring of the 4x4 ROI
799        // That outer ring occupies 12 of 16 pixels
800        assert!(
801            r.net < r.total,
802            "net should be less than total with bgd subtraction"
803        );
804    }
805
806    #[test]
807    fn test_empty_roi() {
808        let arr = make_2d_array(4, 4, |_, _| 10.0);
809        let rois = vec![ROIStatROI {
810            enabled: true,
811            offset: [0, 0],
812            size: [0, 0],
813            bgd_width: 0,
814        }];
815
816        let mut proc = ROIStatProcessor::new(rois, 0);
817        let pool = NDArrayPool::new(1_000_000);
818        proc.process_array(&arr, &pool);
819
820        let r = &proc.results()[0];
821        assert!((r.total - 0.0).abs() < 1e-10);
822    }
823
824    #[test]
825    fn test_disabled_roi() {
826        let arr = make_2d_array(4, 4, |_, _| 10.0);
827        let rois = vec![ROIStatROI {
828            enabled: false,
829            offset: [0, 0],
830            size: [4, 4],
831            bgd_width: 0,
832        }];
833
834        let mut proc = ROIStatProcessor::new(rois, 0);
835        let pool = NDArrayPool::new(1_000_000);
836        proc.process_array(&arr, &pool);
837
838        let r = &proc.results()[0];
839        assert!(
840            (r.total - 0.0).abs() < 1e-10,
841            "disabled ROI should have zero stats"
842        );
843    }
844
845    #[test]
846    fn test_roi_out_of_bounds() {
847        let arr = make_2d_array(4, 4, |_, _| 10.0);
848        let rois = vec![ROIStatROI {
849            enabled: true,
850            offset: [10, 10],
851            size: [4, 4],
852            bgd_width: 0,
853        }];
854
855        let mut proc = ROIStatProcessor::new(rois, 0);
856        let pool = NDArrayPool::new(1_000_000);
857        proc.process_array(&arr, &pool);
858
859        let r = &proc.results()[0];
860        assert!(
861            (r.total - 0.0).abs() < 1e-10,
862            "out-of-bounds ROI should produce zero stats"
863        );
864    }
865
866    #[test]
867    fn test_roi_partially_out_of_bounds() {
868        let arr = make_2d_array(4, 4, |_, _| 5.0);
869        let rois = vec![ROIStatROI {
870            enabled: true,
871            offset: [2, 2],
872            size: [10, 10], // extends beyond image
873            bgd_width: 0,
874        }];
875
876        let mut proc = ROIStatProcessor::new(rois, 0);
877        let pool = NDArrayPool::new(1_000_000);
878        proc.process_array(&arr, &pool);
879
880        let r = &proc.results()[0];
881        // Should be clamped to 2x2 region
882        assert!((r.total - 20.0).abs() < 1e-10);
883        assert!((r.mean - 5.0).abs() < 1e-10);
884    }
885
886    #[test]
887    fn test_time_series() {
888        let rois = vec![ROIStatROI {
889            enabled: true,
890            offset: [0, 0],
891            size: [4, 4],
892            bgd_width: 0,
893        }];
894
895        let mut proc = ROIStatProcessor::new(rois, 100);
896        let pool = NDArrayPool::new(1_000_000);
897        proc.set_ts_mode(TSMode::Acquiring);
898
899        for i in 0..5 {
900            let arr = make_2d_array(4, 4, |_, _| (i + 1) as f64);
901            proc.process_array(&arr, &pool);
902        }
903
904        // Check mean time series (stat index 2)
905        let ts = proc.ts_buffer(0, 2);
906        assert_eq!(ts.len(), 5);
907        assert!((ts[0] - 1.0).abs() < 1e-10);
908        assert!((ts[4] - 5.0).abs() < 1e-10);
909    }
910
911    #[test]
912    fn test_u8_data() {
913        let mut arr = NDArray::new(
914            vec![NDDimension::new(4), NDDimension::new(4)],
915            NDDataType::UInt8,
916        );
917        if let NDDataBuffer::U8(ref mut v) = arr.data {
918            for (i, val) in v.iter_mut().enumerate() {
919                *val = (i + 1) as u8;
920            }
921        }
922
923        let rois = vec![ROIStatROI {
924            enabled: true,
925            offset: [0, 0],
926            size: [4, 4],
927            bgd_width: 0,
928        }];
929
930        let mut proc = ROIStatProcessor::new(rois, 0);
931        let pool = NDArrayPool::new(1_000_000);
932        proc.process_array(&arr, &pool);
933
934        let r = &proc.results()[0];
935        assert!((r.min - 1.0).abs() < 1e-10);
936        assert!((r.max - 16.0).abs() < 1e-10);
937    }
938
939    #[test]
940    fn test_ts_channel_names() {
941        let names = roi_stat_ts_channel_names(2);
942        assert_eq!(names.len(), 10); // 2 ROIs * 5 stats
943        assert_eq!(names[0], "TS1:MinValue");
944        assert_eq!(names[1], "TS1:MaxValue");
945        assert_eq!(names[4], "TS1:Net");
946        assert_eq!(names[5], "TS2:MinValue");
947        assert_eq!(names[9], "TS2:Net");
948    }
949
950    #[test]
951    fn test_ts_sender_integration() {
952        let (tx, mut rx) = tokio::sync::mpsc::channel::<TimeSeriesData>(16);
953
954        let rois = vec![
955            ROIStatROI {
956                enabled: true,
957                offset: [0, 0],
958                size: [4, 4],
959                bgd_width: 0,
960            },
961            ROIStatROI {
962                enabled: true,
963                offset: [0, 0],
964                size: [2, 2],
965                bgd_width: 0,
966            },
967        ];
968
969        let mut proc = ROIStatProcessor::new(rois, 0);
970        proc.set_ts_sender(tx);
971
972        let pool = NDArrayPool::new(1_000_000);
973        let arr = make_2d_array(4, 4, |_, _| 7.0);
974        proc.process_array(&arr, &pool);
975
976        let data = rx.try_recv().unwrap();
977        // 2 ROIs * 5 stats = 10 values
978        assert_eq!(data.values.len(), 10);
979        // ROI1: min=7, max=7, mean=7, total=112 (4*4*7), net=112
980        assert!((data.values[0] - 7.0).abs() < 1e-10); // min
981        assert!((data.values[1] - 7.0).abs() < 1e-10); // max
982        assert!((data.values[2] - 7.0).abs() < 1e-10); // mean
983        assert!((data.values[3] - 112.0).abs() < 1e-10); // total
984        // ROI2: 2x2 region, total=28 (2*2*7)
985        assert!((data.values[8] - 28.0).abs() < 1e-10); // total
986    }
987}