1use std::sync::Arc;
2
3#[cfg(feature = "parallel")]
4use crate::par_util;
5#[cfg(feature = "parallel")]
6use rayon::prelude::*;
7
8use ad_core_rs::ndarray::{NDArray, NDDataBuffer};
9use ad_core_rs::ndarray_pool::NDArrayPool;
10use ad_core_rs::plugin::runtime::{
11 NDPluginProcess, ParamUpdate, PluginParamSnapshot, PluginRuntimeHandle, ProcessResult,
12};
13use ad_core_rs::plugin::wiring::WiringRegistry;
14use asyn_rs::param::ParamType;
15use asyn_rs::port::PortDriverBase;
16use parking_lot::Mutex;
17
18#[derive(Clone, Copy, Default)]
20pub struct NDStatsParams {
21 pub compute_statistics: usize,
22 pub bgd_width: usize,
23 pub min_value: usize,
24 pub max_value: usize,
25 pub mean_value: usize,
26 pub sigma_value: usize,
27 pub total: usize,
28 pub net: usize,
29 pub min_x: usize,
30 pub min_y: usize,
31 pub max_x: usize,
32 pub max_y: usize,
33 pub compute_centroid: usize,
34 pub centroid_threshold: usize,
35 pub centroid_total: usize,
36 pub centroid_x: usize,
37 pub centroid_y: usize,
38 pub sigma_x: usize,
39 pub sigma_y: usize,
40 pub sigma_xy: usize,
41 pub skewness_x: usize,
42 pub skewness_y: usize,
43 pub kurtosis_x: usize,
44 pub kurtosis_y: usize,
45 pub eccentricity: usize,
46 pub orientation: usize,
47 pub compute_histogram: usize,
48 pub hist_size: usize,
49 pub hist_min: usize,
50 pub hist_max: usize,
51 pub hist_below: usize,
52 pub hist_above: usize,
53 pub hist_entropy: usize,
54 pub compute_profiles: usize,
55 pub cursor_x: usize,
56 pub cursor_y: usize,
57}
58
59#[derive(Debug, Clone, Default)]
61pub struct StatsResult {
62 pub min: f64,
63 pub max: f64,
64 pub mean: f64,
65 pub sigma: f64,
66 pub total: f64,
67 pub net: f64,
68 pub num_elements: usize,
69 pub min_x: usize,
70 pub min_y: usize,
71 pub max_x: usize,
72 pub max_y: usize,
73 pub histogram: Vec<f64>,
74 pub hist_below: f64,
75 pub hist_above: f64,
76 pub hist_entropy: f64,
77 pub profile_avg_x: Vec<f64>,
78 pub profile_avg_y: Vec<f64>,
79 pub profile_threshold_x: Vec<f64>,
80 pub profile_threshold_y: Vec<f64>,
81 pub profile_centroid_x: Vec<f64>,
82 pub profile_centroid_y: Vec<f64>,
83 pub profile_cursor_x: Vec<f64>,
84 pub profile_cursor_y: Vec<f64>,
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct CentroidResult {
90 pub centroid_x: f64,
91 pub centroid_y: f64,
92 pub sigma_x: f64,
93 pub sigma_y: f64,
94 pub sigma_xy: f64,
95 pub centroid_total: f64,
96 pub skewness_x: f64,
97 pub skewness_y: f64,
98 pub kurtosis_x: f64,
99 pub kurtosis_y: f64,
100 pub eccentricity: f64,
101 pub orientation: f64,
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct ProfileResult {
107 pub avg_x: Vec<f64>,
108 pub avg_y: Vec<f64>,
109 pub threshold_x: Vec<f64>,
110 pub threshold_y: Vec<f64>,
111 pub centroid_x: Vec<f64>,
112 pub centroid_y: Vec<f64>,
113 pub cursor_x: Vec<f64>,
114 pub cursor_y: Vec<f64>,
115}
116
117pub fn compute_stats(
124 data: &NDDataBuffer,
125 dims: &[ad_core_rs::ndarray::NDDimension],
126 bgd_width: usize,
127) -> StatsResult {
128 macro_rules! stats_for {
129 ($vec:expr) => {{
130 let v = $vec;
131 if v.is_empty() {
132 return StatsResult::default();
133 }
134
135 let (min, max, min_idx, max_idx, total, variance);
136
137 #[cfg(feature = "parallel")]
138 {
139 if par_util::should_parallelize(v.len()) {
140 let (pmin, pmax, pmin_idx, pmax_idx, ptotal) =
142 par_util::thread_pool().install(|| {
143 v.par_iter()
144 .enumerate()
145 .fold(
146 || (f64::MAX, f64::MIN, 0usize, 0usize, 0.0f64),
147 |(mn, mx, mn_i, mx_i, s), (i, &elem)| {
148 let f = elem as f64;
149 let (new_mn, new_mn_i) =
150 if f < mn { (f, i) } else { (mn, mn_i) };
151 let (new_mx, new_mx_i) =
152 if f > mx { (f, i) } else { (mx, mx_i) };
153 (new_mn, new_mx, new_mn_i, new_mx_i, s + f)
154 },
155 )
156 .reduce(
157 || (f64::MAX, f64::MIN, 0, 0, 0.0),
158 |(mn1, mx1, mn_i1, mx_i1, s1), (mn2, mx2, mn_i2, mx_i2, s2)| {
159 let (rmn, rmn_i) = if mn1 <= mn2 {
160 (mn1, mn_i1)
161 } else {
162 (mn2, mn_i2)
163 };
164 let (rmx, rmx_i) = if mx1 >= mx2 {
165 (mx1, mx_i1)
166 } else {
167 (mx2, mx_i2)
168 };
169 (rmn, rmx, rmn_i, rmx_i, s1 + s2)
170 },
171 )
172 });
173 min = pmin;
174 max = pmax;
175 min_idx = pmin_idx;
176 max_idx = pmax_idx;
177 total = ptotal;
178 let mean_tmp = total / v.len() as f64;
179 variance = par_util::thread_pool().install(|| {
180 v.par_iter()
181 .map(|&elem| {
182 let d = elem as f64 - mean_tmp;
183 d * d
184 })
185 .sum::<f64>()
186 });
187 } else {
188 let mut lmin = v[0] as f64;
189 let mut lmax = v[0] as f64;
190 let mut lmin_idx: usize = 0;
191 let mut lmax_idx: usize = 0;
192 let mut ltotal = 0.0f64;
193 for (i, &elem) in v.iter().enumerate() {
194 let f = elem as f64;
195 if f < lmin {
196 lmin = f;
197 lmin_idx = i;
198 }
199 if f > lmax {
200 lmax = f;
201 lmax_idx = i;
202 }
203 ltotal += f;
204 }
205 min = lmin;
206 max = lmax;
207 min_idx = lmin_idx;
208 max_idx = lmax_idx;
209 total = ltotal;
210 let mean_tmp = total / v.len() as f64;
211 let mut lvar = 0.0f64;
212 for &elem in v.iter() {
213 let d = elem as f64 - mean_tmp;
214 lvar += d * d;
215 }
216 variance = lvar;
217 }
218 }
219
220 #[cfg(not(feature = "parallel"))]
221 {
222 let mut lmin = v[0] as f64;
223 let mut lmax = v[0] as f64;
224 let mut lmin_idx: usize = 0;
225 let mut lmax_idx: usize = 0;
226 let mut ltotal = 0.0f64;
227 for (i, &elem) in v.iter().enumerate() {
228 let f = elem as f64;
229 if f < lmin {
230 lmin = f;
231 lmin_idx = i;
232 }
233 if f > lmax {
234 lmax = f;
235 lmax_idx = i;
236 }
237 ltotal += f;
238 }
239 min = lmin;
240 max = lmax;
241 min_idx = lmin_idx;
242 max_idx = lmax_idx;
243 total = ltotal;
244 let mean_tmp = total / v.len() as f64;
245 let mut lvar = 0.0f64;
246 for &elem in v.iter() {
247 let d = elem as f64 - mean_tmp;
248 lvar += d * d;
249 }
250 variance = lvar;
251 }
252
253 let mean = total / v.len() as f64;
254 let sigma = (variance / v.len() as f64).sqrt();
255 let x_size = dims.first().map_or(v.len(), |d| d.size);
256
257 let net = if bgd_width > 0 && dims.len() >= 2 {
259 let y_size = dims[1].size;
260 let mut bgd_sum = 0.0f64;
261 let mut bgd_count = 0usize;
262 for iy in 0..y_size {
263 for ix in 0..x_size {
264 let is_edge = ix < bgd_width
265 || ix >= x_size.saturating_sub(bgd_width)
266 || iy < bgd_width
267 || iy >= y_size.saturating_sub(bgd_width);
268 if is_edge {
269 let idx = iy * x_size + ix;
270 if idx < v.len() {
271 bgd_sum += v[idx] as f64;
272 bgd_count += 1;
273 }
274 }
275 }
276 }
277 let bgd_avg = if bgd_count > 0 {
278 bgd_sum / bgd_count as f64
279 } else {
280 0.0
281 };
282 total - bgd_avg * v.len() as f64
283 } else {
284 total
285 };
286
287 StatsResult {
288 min,
289 max,
290 mean,
291 sigma,
292 total,
293 net,
294 num_elements: v.len(),
295 min_x: if x_size > 0 { min_idx % x_size } else { 0 },
296 min_y: if x_size > 0 { min_idx / x_size } else { 0 },
297 max_x: if x_size > 0 { max_idx % x_size } else { 0 },
298 max_y: if x_size > 0 { max_idx / x_size } else { 0 },
299 ..StatsResult::default()
300 }
301 }};
302 }
303
304 match data {
305 NDDataBuffer::I8(v) => stats_for!(v),
306 NDDataBuffer::U8(v) => stats_for!(v),
307 NDDataBuffer::I16(v) => stats_for!(v),
308 NDDataBuffer::U16(v) => stats_for!(v),
309 NDDataBuffer::I32(v) => stats_for!(v),
310 NDDataBuffer::U32(v) => stats_for!(v),
311 NDDataBuffer::I64(v) => stats_for!(v),
312 NDDataBuffer::U64(v) => stats_for!(v),
313 NDDataBuffer::F32(v) => stats_for!(v),
314 NDDataBuffer::F64(v) => stats_for!(v),
315 }
316}
317
318pub fn compute_centroid(
322 data: &NDDataBuffer,
323 x_size: usize,
324 y_size: usize,
325 threshold: f64,
326) -> CentroidResult {
327 let n = x_size * y_size;
328 if n == 0 || data.len() < n {
329 return CentroidResult::default();
330 }
331
332 let vals: Vec<f64> = (0..n).map(|i| data.get_as_f64(i).unwrap_or(0.0)).collect();
334
335 let (m00, m10, m01);
337
338 #[cfg(feature = "parallel")]
339 {
340 if par_util::should_parallelize(n) {
341 let xs = x_size;
342 let thr = threshold;
343 let (pm00, pm10, pm01) = par_util::thread_pool().install(|| {
344 vals.par_iter()
345 .enumerate()
346 .fold(
347 || (0.0f64, 0.0f64, 0.0f64),
348 |(s00, s10, s01), (i, &val)| {
349 if val < thr {
350 return (s00, s10, s01);
351 }
352 let ix = i % xs;
353 let iy = i / xs;
354 (s00 + val, s10 + val * ix as f64, s01 + val * iy as f64)
355 },
356 )
357 .reduce(
358 || (0.0, 0.0, 0.0),
359 |(a0, a1, a2), (b0, b1, b2)| (a0 + b0, a1 + b1, a2 + b2),
360 )
361 });
362 m00 = pm00;
363 m10 = pm10;
364 m01 = pm01;
365 } else {
366 let mut lm00 = 0.0f64;
367 let mut lm10 = 0.0f64;
368 let mut lm01 = 0.0f64;
369 for iy in 0..y_size {
370 for ix in 0..x_size {
371 let val = vals[iy * x_size + ix];
372 if val < threshold {
373 continue;
374 }
375 lm00 += val;
376 lm10 += val * ix as f64;
377 lm01 += val * iy as f64;
378 }
379 }
380 m00 = lm00;
381 m10 = lm10;
382 m01 = lm01;
383 }
384 }
385
386 #[cfg(not(feature = "parallel"))]
387 {
388 let mut lm00 = 0.0f64;
389 let mut lm10 = 0.0f64;
390 let mut lm01 = 0.0f64;
391 for iy in 0..y_size {
392 for ix in 0..x_size {
393 let val = vals[iy * x_size + ix];
394 if val < threshold {
395 continue;
396 }
397 lm00 += val;
398 lm10 += val * ix as f64;
399 lm01 += val * iy as f64;
400 }
401 }
402 m00 = lm00;
403 m10 = lm10;
404 m01 = lm01;
405 }
406
407 if m00 == 0.0 {
408 return CentroidResult::default();
409 }
410
411 let cx = m10 / m00;
412 let cy = m01 / m00;
413
414 let (mu20, mu02, mu11, m30_central, m03_central, m40_central, m04_central);
416
417 #[cfg(feature = "parallel")]
418 {
419 if par_util::should_parallelize(n) {
420 let xs = x_size;
421 let thr = threshold;
422 let (p20, p02, p11, p30, p03, p40, p04) = par_util::thread_pool().install(|| {
423 vals.par_iter()
424 .enumerate()
425 .fold(
426 || (0.0f64, 0.0f64, 0.0f64, 0.0f64, 0.0f64, 0.0f64, 0.0f64),
427 |(s20, s02, s11, s30, s03, s40, s04), (i, &val)| {
428 if val < thr {
429 return (s20, s02, s11, s30, s03, s40, s04);
430 }
431 let ix = i % xs;
432 let iy = i / xs;
433 let dx = ix as f64 - cx;
434 let dy = iy as f64 - cy;
435 let dx2 = dx * dx;
436 let dy2 = dy * dy;
437 (
438 s20 + val * dx2,
439 s02 + val * dy2,
440 s11 + val * dx * dy,
441 s30 + val * dx2 * dx,
442 s03 + val * dy2 * dy,
443 s40 + val * dx2 * dx2,
444 s04 + val * dy2 * dy2,
445 )
446 },
447 )
448 .reduce(
449 || (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0),
450 |(a0, a1, a2, a3, a4, a5, a6), (b0, b1, b2, b3, b4, b5, b6)| {
451 (
452 a0 + b0,
453 a1 + b1,
454 a2 + b2,
455 a3 + b3,
456 a4 + b4,
457 a5 + b5,
458 a6 + b6,
459 )
460 },
461 )
462 });
463 mu20 = p20;
464 mu02 = p02;
465 mu11 = p11;
466 m30_central = p30;
467 m03_central = p03;
468 m40_central = p40;
469 m04_central = p04;
470 } else {
471 let mut l20 = 0.0f64;
472 let mut l02 = 0.0f64;
473 let mut l11 = 0.0f64;
474 let mut l30 = 0.0f64;
475 let mut l03 = 0.0f64;
476 let mut l40 = 0.0f64;
477 let mut l04 = 0.0f64;
478 for iy in 0..y_size {
479 for ix in 0..x_size {
480 let val = vals[iy * x_size + ix];
481 if val < threshold {
482 continue;
483 }
484 let dx = ix as f64 - cx;
485 let dy = iy as f64 - cy;
486 let dx2 = dx * dx;
487 let dy2 = dy * dy;
488 l20 += val * dx2;
489 l02 += val * dy2;
490 l11 += val * dx * dy;
491 l30 += val * dx2 * dx;
492 l03 += val * dy2 * dy;
493 l40 += val * dx2 * dx2;
494 l04 += val * dy2 * dy2;
495 }
496 }
497 mu20 = l20;
498 mu02 = l02;
499 mu11 = l11;
500 m30_central = l30;
501 m03_central = l03;
502 m40_central = l40;
503 m04_central = l04;
504 }
505 }
506
507 #[cfg(not(feature = "parallel"))]
508 {
509 let mut l20 = 0.0f64;
510 let mut l02 = 0.0f64;
511 let mut l11 = 0.0f64;
512 let mut l30 = 0.0f64;
513 let mut l03 = 0.0f64;
514 let mut l40 = 0.0f64;
515 let mut l04 = 0.0f64;
516 for iy in 0..y_size {
517 for ix in 0..x_size {
518 let val = vals[iy * x_size + ix];
519 if val < threshold {
520 continue;
521 }
522 let dx = ix as f64 - cx;
523 let dy = iy as f64 - cy;
524 let dx2 = dx * dx;
525 let dy2 = dy * dy;
526 l20 += val * dx2;
527 l02 += val * dy2;
528 l11 += val * dx * dy;
529 l30 += val * dx2 * dx;
530 l03 += val * dy2 * dy;
531 l40 += val * dx2 * dx2;
532 l04 += val * dy2 * dy2;
533 }
534 }
535 mu20 = l20;
536 mu02 = l02;
537 mu11 = l11;
538 m30_central = l30;
539 m03_central = l03;
540 m40_central = l40;
541 m04_central = l04;
542 }
543
544 let sigma_x = (mu20 / m00).sqrt();
545 let sigma_y = (mu02 / m00).sqrt();
546 let sigma_xy = mu11 / m00;
547
548 let skewness_x = if sigma_x > 0.0 {
550 m30_central / (m00 * sigma_x.powi(3))
551 } else {
552 0.0
553 };
554 let skewness_y = if sigma_y > 0.0 {
555 m03_central / (m00 * sigma_y.powi(3))
556 } else {
557 0.0
558 };
559
560 let kurtosis_x = if sigma_x > 0.0 {
562 m40_central / (m00 * sigma_x.powi(4)) - 3.0
563 } else {
564 0.0
565 };
566 let kurtosis_y = if sigma_y > 0.0 {
567 m04_central / (m00 * sigma_y.powi(4)) - 3.0
568 } else {
569 0.0
570 };
571
572 let mu20_norm = mu20 / m00;
574 let mu02_norm = mu02 / m00;
575 let mu11_norm = mu11 / m00;
576 let denom = mu20_norm + mu02_norm;
577 let eccentricity = if denom > 0.0 {
578 ((mu20_norm - mu02_norm).powi(2) + 4.0 * mu11_norm.powi(2)) / denom.powi(2)
579 } else {
580 0.0
581 };
582
583 let orientation =
585 0.5 * (2.0 * mu11_norm).atan2(mu20_norm - mu02_norm) * 180.0 / std::f64::consts::PI;
586
587 CentroidResult {
588 centroid_x: cx,
589 centroid_y: cy,
590 sigma_x,
591 sigma_y,
592 sigma_xy,
593 centroid_total: m00,
594 skewness_x,
595 skewness_y,
596 kurtosis_x,
597 kurtosis_y,
598 eccentricity,
599 orientation,
600 }
601}
602
603pub fn compute_histogram(
612 data: &NDDataBuffer,
613 hist_size: usize,
614 hist_min: f64,
615 hist_max: f64,
616) -> (Vec<f64>, f64, f64, f64) {
617 if hist_size == 0 || hist_max <= hist_min {
618 return (vec![], 0.0, 0.0, 0.0);
619 }
620
621 let mut histogram = vec![0.0f64; hist_size];
622 let mut below = 0.0f64;
623 let mut above = 0.0f64;
624 let range = hist_max - hist_min;
625 let n = data.len();
626
627 #[cfg(feature = "parallel")]
628 let use_parallel = par_util::should_parallelize(n);
629 #[cfg(not(feature = "parallel"))]
630 let use_parallel = false;
631
632 if use_parallel {
633 #[cfg(feature = "parallel")]
634 {
635 let vals: Vec<f64> = (0..n).map(|i| data.get_as_f64(i).unwrap_or(0.0)).collect();
636 let chunk_size = (n / rayon::current_num_threads().max(1)).max(1024);
637 let hs = hist_size;
638 let hmin = hist_min;
639 let hmax = hist_max;
640 let rng = range;
641 let chunk_results: Vec<(Vec<f64>, f64, f64)> = par_util::thread_pool().install(|| {
642 vals.par_chunks(chunk_size)
643 .map(|chunk| {
644 let mut local_hist = vec![0.0f64; hs];
645 let mut local_below = 0.0f64;
646 let mut local_above = 0.0f64;
647 for &val in chunk {
648 if val < hmin {
649 local_below += 1.0;
650 } else if val > hmax {
651 local_above += 1.0;
652 } else {
653 let bin = ((val - hmin) * (hs - 1) as f64 / rng + 0.5) as usize;
654 let bin = bin.min(hs - 1);
655 local_hist[bin] += 1.0;
656 }
657 }
658 (local_hist, local_below, local_above)
659 })
660 .collect()
661 });
662 for (local_hist, local_below, local_above) in chunk_results {
663 below += local_below;
664 above += local_above;
665 for (i, &count) in local_hist.iter().enumerate() {
666 histogram[i] += count;
667 }
668 }
669 }
670 } else {
671 for i in 0..n {
672 let val = data.get_as_f64(i).unwrap_or(0.0);
673 if val < hist_min {
674 below += 1.0;
675 } else if val > hist_max {
676 above += 1.0;
677 } else {
678 let bin = ((val - hist_min) * (hist_size - 1) as f64 / range + 0.5) as usize;
679 let bin = bin.min(hist_size - 1);
680 histogram[bin] += 1.0;
681 }
682 }
683 }
684
685 let total_in_bins: f64 = histogram.iter().sum();
687 let entropy = if total_in_bins > 0.0 {
688 let mut ent = 0.0f64;
689 for &count in &histogram {
690 if count > 0.0 {
691 let p = count / total_in_bins;
692 ent -= p * p.ln();
693 }
694 }
695 ent
696 } else {
697 0.0
698 };
699
700 (histogram, below, above, entropy)
701}
702
703pub fn compute_profiles(
710 data: &NDDataBuffer,
711 x_size: usize,
712 y_size: usize,
713 threshold: f64,
714 centroid_x: f64,
715 centroid_y: f64,
716 cursor_x: usize,
717 cursor_y: usize,
718) -> ProfileResult {
719 if x_size == 0 || y_size == 0 || data.len() < x_size * y_size {
720 return ProfileResult::default();
721 }
722
723 let mut avg_x = vec![0.0f64; x_size];
724 let mut avg_y = vec![0.0f64; y_size];
725 let mut thresh_x_sum = vec![0.0f64; x_size];
726 let mut thresh_x_cnt = vec![0usize; x_size];
727 let mut thresh_y_sum = vec![0.0f64; y_size];
728 let mut thresh_y_cnt = vec![0usize; y_size];
729
730 for iy in 0..y_size {
732 for ix in 0..x_size {
733 let val = data.get_as_f64(iy * x_size + ix).unwrap_or(0.0);
734 avg_x[ix] += val;
735 avg_y[iy] += val;
736 if val >= threshold {
737 thresh_x_sum[ix] += val;
738 thresh_x_cnt[ix] += 1;
739 thresh_y_sum[iy] += val;
740 thresh_y_cnt[iy] += 1;
741 }
742 }
743 }
744
745 for ix in 0..x_size {
747 avg_x[ix] /= y_size as f64;
748 }
749 for iy in 0..y_size {
750 avg_y[iy] /= x_size as f64;
751 }
752
753 let threshold_x: Vec<f64> = thresh_x_sum
755 .iter()
756 .zip(thresh_x_cnt.iter())
757 .map(|(&s, &c)| if c > 0 { s / c as f64 } else { 0.0 })
758 .collect();
759 let threshold_y: Vec<f64> = thresh_y_sum
760 .iter()
761 .zip(thresh_y_cnt.iter())
762 .map(|(&s, &c)| if c > 0 { s / c as f64 } else { 0.0 })
763 .collect();
764
765 let cy_row = (centroid_y + 0.5) as usize;
767 let cx_col = (centroid_x + 0.5) as usize;
768
769 let centroid_x_profile = if cy_row < y_size {
770 (0..x_size)
771 .map(|ix| data.get_as_f64(cy_row * x_size + ix).unwrap_or(0.0))
772 .collect()
773 } else {
774 vec![0.0; x_size]
775 };
776
777 let centroid_y_profile = if cx_col < x_size {
778 (0..y_size)
779 .map(|iy| data.get_as_f64(iy * x_size + cx_col).unwrap_or(0.0))
780 .collect()
781 } else {
782 vec![0.0; y_size]
783 };
784
785 let cursor_x_profile = if cursor_y < y_size {
787 (0..x_size)
788 .map(|ix| data.get_as_f64(cursor_y * x_size + ix).unwrap_or(0.0))
789 .collect()
790 } else {
791 vec![0.0; x_size]
792 };
793
794 let cursor_y_profile = if cursor_x < x_size {
795 (0..y_size)
796 .map(|iy| data.get_as_f64(iy * x_size + cursor_x).unwrap_or(0.0))
797 .collect()
798 } else {
799 vec![0.0; y_size]
800 };
801
802 ProfileResult {
803 avg_x,
804 avg_y,
805 threshold_x,
806 threshold_y,
807 centroid_x: centroid_x_profile,
808 centroid_y: centroid_y_profile,
809 cursor_x: cursor_x_profile,
810 cursor_y: cursor_y_profile,
811 }
812}
813
814pub struct StatsProcessor {
816 latest_stats: Arc<Mutex<StatsResult>>,
817 do_compute_statistics: bool,
818 do_compute_centroid: bool,
819 do_compute_histogram: bool,
820 do_compute_profiles: bool,
821 bgd_width: usize,
822 centroid_threshold: f64,
823 cursor_x: usize,
824 cursor_y: usize,
825 hist_size: usize,
826 hist_min: f64,
827 hist_max: f64,
828 params: NDStatsParams,
829 params_out: Arc<Mutex<NDStatsParams>>,
831 ts_sender: Option<crate::time_series::TimeSeriesSender>,
833}
834
835impl StatsProcessor {
836 pub fn new() -> Self {
837 Self {
838 latest_stats: Arc::new(Mutex::new(StatsResult::default())),
839 do_compute_statistics: true,
840 do_compute_centroid: true,
841 do_compute_histogram: false,
842 do_compute_profiles: false,
843 bgd_width: 0,
844 centroid_threshold: 0.0,
845 cursor_x: 0,
846 cursor_y: 0,
847 hist_size: 256,
848 hist_min: 0.0,
849 hist_max: 255.0,
850 params: NDStatsParams::default(),
851 params_out: Arc::new(Mutex::new(NDStatsParams::default())),
852 ts_sender: None,
853 }
854 }
855
856 pub fn stats_handle(&self) -> Arc<Mutex<StatsResult>> {
858 self.latest_stats.clone()
859 }
860
861 pub fn params_handle(&self) -> Arc<Mutex<NDStatsParams>> {
863 self.params_out.clone()
864 }
865
866 pub fn set_ts_sender(&mut self, sender: crate::time_series::TimeSeriesSender) {
868 self.ts_sender = Some(sender);
869 }
870}
871
872impl Default for StatsProcessor {
873 fn default() -> Self {
874 Self::new()
875 }
876}
877
878impl NDPluginProcess for StatsProcessor {
879 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
880 let p = &self.params;
881 let info = array.info();
882
883 let mut result = if self.do_compute_statistics {
884 compute_stats(&array.data, &array.dims, self.bgd_width)
885 } else {
886 StatsResult::default()
887 };
888
889 let mut centroid = CentroidResult::default();
891 if self.do_compute_centroid {
892 if info.color_size == 1 && array.dims.len() >= 2 {
893 centroid = compute_centroid(
894 &array.data,
895 info.x_size,
896 info.y_size,
897 self.centroid_threshold,
898 );
899 }
900 }
901
902 if self.do_compute_histogram {
904 let (histogram, below, above, entropy) =
905 compute_histogram(&array.data, self.hist_size, self.hist_min, self.hist_max);
906 result.histogram = histogram;
907 result.hist_below = below;
908 result.hist_above = above;
909 result.hist_entropy = entropy;
910 }
911
912 if self.do_compute_profiles && info.color_size == 1 && array.dims.len() >= 2 {
914 let profiles = compute_profiles(
915 &array.data,
916 info.x_size,
917 info.y_size,
918 self.centroid_threshold,
919 centroid.centroid_x,
920 centroid.centroid_y,
921 self.cursor_x,
922 self.cursor_y,
923 );
924 result.profile_avg_x = profiles.avg_x;
925 result.profile_avg_y = profiles.avg_y;
926 result.profile_threshold_x = profiles.threshold_x;
927 result.profile_threshold_y = profiles.threshold_y;
928 result.profile_centroid_x = profiles.centroid_x;
929 result.profile_centroid_y = profiles.centroid_y;
930 result.profile_cursor_x = profiles.cursor_x;
931 result.profile_cursor_y = profiles.cursor_y;
932 }
933
934 let updates = vec![
935 ParamUpdate::float64(p.min_value, result.min),
936 ParamUpdate::float64(p.max_value, result.max),
937 ParamUpdate::float64(p.mean_value, result.mean),
938 ParamUpdate::float64(p.sigma_value, result.sigma),
939 ParamUpdate::float64(p.total, result.total),
940 ParamUpdate::float64(p.net, result.net),
941 ParamUpdate::float64(p.min_x, result.min_x as f64),
942 ParamUpdate::float64(p.min_y, result.min_y as f64),
943 ParamUpdate::float64(p.max_x, result.max_x as f64),
944 ParamUpdate::float64(p.max_y, result.max_y as f64),
945 ParamUpdate::float64(p.centroid_x, centroid.centroid_x),
946 ParamUpdate::float64(p.centroid_y, centroid.centroid_y),
947 ParamUpdate::float64(p.sigma_x, centroid.sigma_x),
948 ParamUpdate::float64(p.sigma_y, centroid.sigma_y),
949 ParamUpdate::float64(p.sigma_xy, centroid.sigma_xy),
950 ParamUpdate::float64(p.centroid_total, centroid.centroid_total),
951 ParamUpdate::float64(p.skewness_x, centroid.skewness_x),
952 ParamUpdate::float64(p.skewness_y, centroid.skewness_y),
953 ParamUpdate::float64(p.kurtosis_x, centroid.kurtosis_x),
954 ParamUpdate::float64(p.kurtosis_y, centroid.kurtosis_y),
955 ParamUpdate::float64(p.eccentricity, centroid.eccentricity),
956 ParamUpdate::float64(p.orientation, centroid.orientation),
957 ParamUpdate::float64(p.hist_below, result.hist_below),
958 ParamUpdate::float64(p.hist_above, result.hist_above),
959 ParamUpdate::float64(p.hist_entropy, result.hist_entropy),
960 ];
961
962 if let Some(ref sender) = self.ts_sender {
964 let ts_data = crate::time_series::TimeSeriesData {
965 values: vec![
966 result.min,
967 result.min_x as f64,
968 result.min_y as f64,
969 result.max,
970 result.max_x as f64,
971 result.max_y as f64,
972 result.mean,
973 result.sigma,
974 result.total,
975 result.net,
976 centroid.centroid_total,
977 centroid.centroid_x,
978 centroid.centroid_y,
979 centroid.sigma_x,
980 centroid.sigma_y,
981 centroid.sigma_xy,
982 centroid.skewness_x,
983 centroid.skewness_y,
984 centroid.kurtosis_x,
985 centroid.kurtosis_y,
986 centroid.eccentricity,
987 centroid.orientation,
988 array.timestamp.as_f64(),
989 ],
990 };
991 let _ = sender.try_send(ts_data);
992 }
993
994 *self.latest_stats.lock() = result;
995 ProcessResult::sink(updates)
996 }
997
998 fn plugin_type(&self) -> &str {
999 "NDPluginStats"
1000 }
1001
1002 fn register_params(
1003 &mut self,
1004 base: &mut PortDriverBase,
1005 ) -> Result<(), asyn_rs::error::AsynError> {
1006 self.params.compute_statistics =
1007 base.create_param("COMPUTE_STATISTICS", ParamType::Int32)?;
1008 base.set_int32_param(self.params.compute_statistics, 0, 1)?;
1009
1010 self.params.bgd_width = base.create_param("BGD_WIDTH", ParamType::Int32)?;
1011 self.params.min_value = base.create_param("MIN_VALUE", ParamType::Float64)?;
1012 self.params.max_value = base.create_param("MAX_VALUE", ParamType::Float64)?;
1013 self.params.mean_value = base.create_param("MEAN_VALUE", ParamType::Float64)?;
1014 self.params.sigma_value = base.create_param("SIGMA_VALUE", ParamType::Float64)?;
1015 self.params.total = base.create_param("TOTAL", ParamType::Float64)?;
1016 self.params.net = base.create_param("NET", ParamType::Float64)?;
1017 self.params.min_x = base.create_param("MIN_X", ParamType::Float64)?;
1018 self.params.min_y = base.create_param("MIN_Y", ParamType::Float64)?;
1019 self.params.max_x = base.create_param("MAX_X", ParamType::Float64)?;
1020 self.params.max_y = base.create_param("MAX_Y", ParamType::Float64)?;
1021
1022 self.params.compute_centroid = base.create_param("COMPUTE_CENTROID", ParamType::Int32)?;
1023 base.set_int32_param(self.params.compute_centroid, 0, 1)?;
1024
1025 self.params.centroid_threshold =
1026 base.create_param("CENTROID_THRESHOLD", ParamType::Float64)?;
1027 self.params.centroid_total = base.create_param("CENTROID_TOTAL", ParamType::Float64)?;
1028 self.params.centroid_x = base.create_param("CENTROIDX_VALUE", ParamType::Float64)?;
1029 self.params.centroid_y = base.create_param("CENTROIDY_VALUE", ParamType::Float64)?;
1030 self.params.sigma_x = base.create_param("SIGMAX_VALUE", ParamType::Float64)?;
1031 self.params.sigma_y = base.create_param("SIGMAY_VALUE", ParamType::Float64)?;
1032 self.params.sigma_xy = base.create_param("SIGMAXY_VALUE", ParamType::Float64)?;
1033 self.params.skewness_x = base.create_param("SKEWNESSX_VALUE", ParamType::Float64)?;
1034 self.params.skewness_y = base.create_param("SKEWNESSY_VALUE", ParamType::Float64)?;
1035 self.params.kurtosis_x = base.create_param("KURTOSISX_VALUE", ParamType::Float64)?;
1036 self.params.kurtosis_y = base.create_param("KURTOSISY_VALUE", ParamType::Float64)?;
1037 self.params.eccentricity = base.create_param("ECCENTRICITY_VALUE", ParamType::Float64)?;
1038 self.params.orientation = base.create_param("ORIENTATION_VALUE", ParamType::Float64)?;
1039
1040 self.params.compute_histogram = base.create_param("COMPUTE_HISTOGRAM", ParamType::Int32)?;
1041 self.params.hist_size = base.create_param("HIST_SIZE", ParamType::Int32)?;
1042 base.set_int32_param(self.params.hist_size, 0, 256)?;
1043 self.params.hist_min = base.create_param("HIST_MIN", ParamType::Float64)?;
1044 self.params.hist_max = base.create_param("HIST_MAX", ParamType::Float64)?;
1045 base.set_float64_param(self.params.hist_max, 0, 255.0)?;
1046 self.params.hist_below = base.create_param("HIST_BELOW", ParamType::Float64)?;
1047 self.params.hist_above = base.create_param("HIST_ABOVE", ParamType::Float64)?;
1048 self.params.hist_entropy = base.create_param("HIST_ENTROPY", ParamType::Float64)?;
1049
1050 self.params.compute_profiles = base.create_param("COMPUTE_PROFILES", ParamType::Int32)?;
1051 self.params.cursor_x = base.create_param("CURSOR_X", ParamType::Int32)?;
1052 base.set_int32_param(self.params.cursor_x, 0, 0)?;
1053 self.params.cursor_y = base.create_param("CURSOR_Y", ParamType::Int32)?;
1054 base.set_int32_param(self.params.cursor_y, 0, 0)?;
1055
1056 *self.params_out.lock() = self.params;
1058
1059 Ok(())
1060 }
1061
1062 fn on_param_change(
1063 &mut self,
1064 reason: usize,
1065 snapshot: &PluginParamSnapshot,
1066 ) -> ad_core_rs::plugin::runtime::ParamChangeResult {
1067 let p = &self.params;
1068 if reason == p.compute_statistics {
1069 self.do_compute_statistics = snapshot.value.as_i32() != 0;
1070 } else if reason == p.compute_centroid {
1071 self.do_compute_centroid = snapshot.value.as_i32() != 0;
1072 } else if reason == p.compute_histogram {
1073 self.do_compute_histogram = snapshot.value.as_i32() != 0;
1074 } else if reason == p.compute_profiles {
1075 self.do_compute_profiles = snapshot.value.as_i32() != 0;
1076 } else if reason == p.bgd_width {
1077 self.bgd_width = snapshot.value.as_i32().max(0) as usize;
1078 } else if reason == p.centroid_threshold {
1079 self.centroid_threshold = snapshot.value.as_f64();
1080 } else if reason == p.cursor_x {
1081 self.cursor_x = snapshot.value.as_i32().max(0) as usize;
1082 } else if reason == p.cursor_y {
1083 self.cursor_y = snapshot.value.as_i32().max(0) as usize;
1084 } else if reason == p.hist_size {
1085 self.hist_size = (snapshot.value.as_i32().max(1)) as usize;
1086 } else if reason == p.hist_min {
1087 self.hist_min = snapshot.value.as_f64();
1088 } else if reason == p.hist_max {
1089 self.hist_max = snapshot.value.as_f64();
1090 }
1091 ad_core_rs::plugin::runtime::ParamChangeResult::empty()
1092 }
1093}
1094
1095pub fn create_stats_runtime(
1101 port_name: &str,
1102 pool: Arc<NDArrayPool>,
1103 queue_size: usize,
1104 ndarray_port: &str,
1105 wiring: Arc<WiringRegistry>,
1106 ts_registry: &crate::time_series::TsReceiverRegistry,
1107) -> (
1108 PluginRuntimeHandle,
1109 Arc<Mutex<StatsResult>>,
1110 NDStatsParams,
1111 std::thread::JoinHandle<()>,
1112) {
1113 let (ts_tx, ts_rx) = tokio::sync::mpsc::channel(256);
1114
1115 let mut processor = StatsProcessor::new();
1116 processor.set_ts_sender(ts_tx);
1117 let stats_handle = processor.stats_handle();
1118 let params_handle = processor.params_handle();
1119
1120 let (plugin_handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime(
1121 port_name,
1122 processor,
1123 pool,
1124 queue_size,
1125 ndarray_port,
1126 wiring,
1127 );
1128
1129 let stats_params = *params_handle.lock();
1130
1131 let channel_names: Vec<String> = crate::time_series::STATS_TS_CHANNEL_NAMES
1133 .iter()
1134 .map(|s| s.to_string())
1135 .collect();
1136 ts_registry.store(port_name, ts_rx, channel_names);
1137
1138 (plugin_handle, stats_handle, stats_params, data_jh)
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143 use super::*;
1144 use ad_core_rs::ndarray::{NDDataType, NDDimension};
1145
1146 #[test]
1147 fn test_compute_stats_u8() {
1148 let dims = vec![NDDimension::new(5)];
1149 let data = NDDataBuffer::U8(vec![10, 20, 30, 40, 50]);
1150 let stats = compute_stats(&data, &dims, 0);
1151 assert_eq!(stats.min, 10.0);
1152 assert_eq!(stats.max, 50.0);
1153 assert_eq!(stats.mean, 30.0);
1154 assert_eq!(stats.total, 150.0);
1155 assert_eq!(stats.num_elements, 5);
1156 }
1157
1158 #[test]
1159 fn test_compute_stats_sigma() {
1160 let dims = vec![NDDimension::new(8)];
1161 let data = NDDataBuffer::F64(vec![2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0]);
1162 let stats = compute_stats(&data, &dims, 0);
1163 assert!((stats.mean - 5.0).abs() < 1e-10);
1164 assert!((stats.sigma - 2.0).abs() < 1e-10);
1165 }
1166
1167 #[test]
1168 fn test_compute_stats_u16() {
1169 let dims = vec![NDDimension::new(3)];
1170 let data = NDDataBuffer::U16(vec![100, 200, 300]);
1171 let stats = compute_stats(&data, &dims, 0);
1172 assert_eq!(stats.min, 100.0);
1173 assert_eq!(stats.max, 300.0);
1174 assert_eq!(stats.mean, 200.0);
1175 }
1176
1177 #[test]
1178 fn test_compute_stats_f64() {
1179 let dims = vec![NDDimension::new(3)];
1180 let data = NDDataBuffer::F64(vec![1.5, 2.5, 3.5]);
1181 let stats = compute_stats(&data, &dims, 0);
1182 assert!((stats.min - 1.5).abs() < 1e-10);
1183 assert!((stats.max - 3.5).abs() < 1e-10);
1184 assert!((stats.mean - 2.5).abs() < 1e-10);
1185 }
1186
1187 #[test]
1188 fn test_compute_stats_single_element() {
1189 let dims = vec![NDDimension::new(1)];
1190 let data = NDDataBuffer::I32(vec![42]);
1191 let stats = compute_stats(&data, &dims, 0);
1192 assert_eq!(stats.min, 42.0);
1193 assert_eq!(stats.max, 42.0);
1194 assert_eq!(stats.mean, 42.0);
1195 assert_eq!(stats.sigma, 0.0);
1196 assert_eq!(stats.num_elements, 1);
1197 }
1198
1199 #[test]
1200 fn test_compute_stats_empty() {
1201 let data = NDDataBuffer::U8(vec![]);
1202 let stats = compute_stats(&data, &[], 0);
1203 assert_eq!(stats.num_elements, 0);
1204 }
1205
1206 #[test]
1207 fn test_compute_stats_min_max_position() {
1208 let dims = vec![NDDimension::new(4), NDDimension::new(4)];
1209 let data = NDDataBuffer::U8((1..=16).collect());
1211 let stats = compute_stats(&data, &dims, 0);
1212 assert_eq!(stats.min_x, 0); assert_eq!(stats.min_y, 0);
1214 assert_eq!(stats.max_x, 3); assert_eq!(stats.max_y, 3);
1216 }
1217
1218 #[test]
1219 fn test_compute_stats_net_no_bgd() {
1220 let dims = vec![NDDimension::new(4), NDDimension::new(4)];
1221 let data = NDDataBuffer::U8((1..=16).collect());
1222 let stats = compute_stats(&data, &dims, 0);
1223 assert_eq!(stats.net, stats.total);
1225 }
1226
1227 #[test]
1228 fn test_compute_stats_bgd_subtraction() {
1229 let dims = vec![NDDimension::new(4), NDDimension::new(4)];
1231 let mut pixels = vec![10u16; 16];
1232 pixels[2 * 4 + 2] = 110;
1234 let data = NDDataBuffer::U16(pixels);
1235 let stats = compute_stats(&data, &dims, 1);
1236
1237 assert!((stats.net - 100.0).abs() < 1e-10);
1246 }
1247
1248 #[test]
1249 fn test_centroid_uniform() {
1250 let data = NDDataBuffer::U8(vec![1; 16]);
1251 let c = compute_centroid(&data, 4, 4, 0.0);
1252 assert!((c.centroid_x - 1.5).abs() < 1e-10);
1253 assert!((c.centroid_y - 1.5).abs() < 1e-10);
1254 }
1255
1256 #[test]
1257 fn test_centroid_corner() {
1258 let mut d = vec![0u8; 16];
1259 d[0] = 255;
1260 let data = NDDataBuffer::U8(d);
1261 let c = compute_centroid(&data, 4, 4, 0.0);
1262 assert!((c.centroid_x - 0.0).abs() < 1e-10);
1263 assert!((c.centroid_y - 0.0).abs() < 1e-10);
1264 }
1265
1266 #[test]
1267 fn test_centroid_threshold() {
1268 let mut pixels = vec![5u8; 16];
1270 pixels[2 * 4 + 2] = 100;
1271 let data = NDDataBuffer::U8(pixels);
1272
1273 let c = compute_centroid(&data, 4, 4, 50.0);
1275 assert!((c.centroid_x - 2.0).abs() < 1e-10);
1276 assert!((c.centroid_y - 2.0).abs() < 1e-10);
1277 assert!((c.centroid_total - 100.0).abs() < 1e-10);
1278 }
1279
1280 #[test]
1281 fn test_centroid_higher_moments_symmetric() {
1282 let data = NDDataBuffer::U8(vec![1; 16]);
1284 let c = compute_centroid(&data, 4, 4, 0.0);
1285 assert!(c.skewness_x.abs() < 1e-10);
1287 assert!(c.skewness_y.abs() < 1e-10);
1288 assert!(c.eccentricity.abs() < 1e-10);
1290 }
1291
1292 #[test]
1293 fn test_histogram_basic() {
1294 let data = NDDataBuffer::F64((0..10).map(|x| x as f64).collect());
1296 let (hist, below, above, entropy) = compute_histogram(&data, 10, 0.0, 9.0);
1297 assert_eq!(hist.len(), 10);
1298 assert_eq!(below, 0.0);
1299 assert_eq!(above, 0.0);
1300 let total: f64 = hist.iter().sum();
1302 assert!((total - 10.0).abs() < 1e-10);
1303 assert!((entropy - 10.0f64.ln()).abs() < 0.1);
1305 }
1306
1307 #[test]
1308 fn test_histogram_below_above() {
1309 let data = NDDataBuffer::F64(vec![-1.0, 0.5, 1.5, 3.0]);
1310 let (hist, below, above, _entropy) = compute_histogram(&data, 2, 0.0, 2.0);
1311 assert_eq!(below, 1.0); assert_eq!(above, 1.0); let total_in_bins: f64 = hist.iter().sum();
1314 assert!((total_in_bins - 2.0).abs() < 1e-10); }
1316
1317 #[test]
1318 fn test_histogram_single_value() {
1319 let data = NDDataBuffer::F64(vec![5.0; 100]);
1320 let (hist, below, above, entropy) = compute_histogram(&data, 10, 0.0, 10.0);
1321 assert_eq!(below, 0.0);
1322 assert_eq!(above, 0.0);
1323 assert!((entropy - 0.0).abs() < 1e-10);
1325 let total: f64 = hist.iter().sum();
1326 assert!((total - 100.0).abs() < 1e-10);
1327 }
1328
1329 #[test]
1330 fn test_profiles_8x8() {
1331 let mut pixels = vec![0.0f64; 64];
1333 for iy in 0..8 {
1334 for ix in 0..8 {
1335 pixels[iy * 8 + ix] = iy as f64;
1336 }
1337 }
1338 let data = NDDataBuffer::F64(pixels);
1339
1340 let profiles = compute_profiles(
1341 &data, 8, 8, 0.0, 3.5, 3.5, 0, 7, );
1347
1348 assert_eq!(profiles.avg_x.len(), 8);
1350 for &v in &profiles.avg_x {
1351 assert!((v - 3.5).abs() < 1e-10, "avg_x should be 3.5, got {v}");
1352 }
1353
1354 assert_eq!(profiles.avg_y.len(), 8);
1356 for (iy, &v) in profiles.avg_y.iter().enumerate() {
1357 assert!(
1358 (v - iy as f64).abs() < 1e-10,
1359 "avg_y[{iy}] should be {iy}, got {v}"
1360 );
1361 }
1362
1363 assert_eq!(profiles.cursor_x.len(), 8);
1365 for &v in &profiles.cursor_x {
1366 assert!((v - 7.0).abs() < 1e-10);
1367 }
1368
1369 assert_eq!(profiles.cursor_y.len(), 8);
1371 for (iy, &v) in profiles.cursor_y.iter().enumerate() {
1372 assert!((v - iy as f64).abs() < 1e-10);
1373 }
1374
1375 assert_eq!(profiles.centroid_x.len(), 8);
1377 for &v in &profiles.centroid_x {
1378 assert!((v - 4.0).abs() < 1e-10);
1379 }
1380
1381 assert_eq!(profiles.centroid_y.len(), 8);
1383 for (iy, &v) in profiles.centroid_y.iter().enumerate() {
1384 assert!((v - iy as f64).abs() < 1e-10);
1385 }
1386 }
1387
1388 #[test]
1389 fn test_profiles_threshold() {
1390 let mut pixels = vec![1.0f64; 16];
1392 pixels[1 * 4 + 2] = 10.0;
1393 let data = NDDataBuffer::F64(pixels);
1394
1395 let profiles = compute_profiles(
1396 &data, 4, 4, 5.0, 2.0, 1.0, 0, 0,
1398 );
1399
1400 assert_eq!(profiles.threshold_x.len(), 4);
1402 assert!((profiles.threshold_x[2] - 10.0).abs() < 1e-10);
1403 assert!((profiles.threshold_x[0] - 0.0).abs() < 1e-10);
1405 assert!((profiles.threshold_x[1] - 0.0).abs() < 1e-10);
1406 assert!((profiles.threshold_x[3] - 0.0).abs() < 1e-10);
1407
1408 assert_eq!(profiles.threshold_y.len(), 4);
1410 assert!((profiles.threshold_y[1] - 10.0).abs() < 1e-10);
1411 assert!((profiles.threshold_y[0] - 0.0).abs() < 1e-10);
1412 }
1413
1414 #[test]
1415 fn test_stats_processor_direct() {
1416 let mut proc = StatsProcessor::new();
1417 let pool = NDArrayPool::new(1_000_000);
1418
1419 let mut arr = NDArray::new(vec![NDDimension::new(5)], NDDataType::UInt8);
1420 if let NDDataBuffer::U8(ref mut v) = arr.data {
1421 v[0] = 10;
1422 v[1] = 20;
1423 v[2] = 30;
1424 v[3] = 40;
1425 v[4] = 50;
1426 }
1427
1428 let result = proc.process_array(&arr, &pool);
1429 assert!(result.output_arrays.is_empty(), "stats is a sink");
1430
1431 let stats = proc.stats_handle().lock().clone();
1432 assert_eq!(stats.min, 10.0);
1433 assert_eq!(stats.max, 50.0);
1434 assert_eq!(stats.mean, 30.0);
1435 }
1436
1437 #[test]
1438 fn test_stats_runtime_end_to_end() {
1439 let pool = Arc::new(NDArrayPool::new(1_000_000));
1440 let wiring = Arc::new(WiringRegistry::new());
1441 let ts_registry = crate::time_series::TsReceiverRegistry::new();
1442 let (handle, stats, _params, _jh) =
1443 create_stats_runtime("STATS_RT", pool, 10, "", wiring, &ts_registry);
1444
1445 handle
1447 .port_runtime()
1448 .port_handle()
1449 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1450 .unwrap();
1451 std::thread::sleep(std::time::Duration::from_millis(10));
1452
1453 let mut arr = NDArray::new(
1454 vec![NDDimension::new(4), NDDimension::new(4)],
1455 NDDataType::UInt8,
1456 );
1457 if let NDDataBuffer::U8(ref mut v) = arr.data {
1458 for (i, val) in v.iter_mut().enumerate() {
1459 *val = (i + 1) as u8;
1460 }
1461 }
1462
1463 handle.array_sender().send(Arc::new(arr));
1464 std::thread::sleep(std::time::Duration::from_millis(100));
1465
1466 let result = stats.lock().clone();
1467 assert_eq!(result.min, 1.0);
1468 assert_eq!(result.max, 16.0);
1469 assert_eq!(result.num_elements, 16);
1470 }
1471}