1use std::sync::Arc;
2use std::sync::atomic::{AtomicI32, AtomicU32, AtomicU64, Ordering};
3
4use parking_lot::Mutex;
5
6use crate::error::{ADError, ADResult};
7use crate::ndarray::{NDArray, NDDataBuffer, NDDataType, NDDimension};
8use crate::ndarray_handle::{NDArrayHandle, pooled_array};
9use crate::timestamp::EpicsTimestamp;
10
11const THRESHOLD_SIZE_RATIO: f64 = 1.5;
14
15static NEXT_POOL_ID: AtomicU64 = AtomicU64::new(1);
19
20pub struct NDArrayPool {
27 id: u64,
29 max_memory: usize,
30 allocated_bytes: AtomicU64,
31 next_unique_id: AtomicI32,
32 free_list: Mutex<Vec<NDArray>>,
33 num_alloc_buffers: AtomicU32,
34 num_free_buffers: AtomicU32,
35}
36
37impl NDArrayPool {
38 pub fn new(max_memory: usize) -> Self {
39 Self {
40 id: NEXT_POOL_ID.fetch_add(1, Ordering::Relaxed),
41 max_memory,
42 allocated_bytes: AtomicU64::new(0),
43 next_unique_id: AtomicI32::new(1),
44 free_list: Mutex::new(Vec::new()),
45 num_alloc_buffers: AtomicU32::new(0),
46 num_free_buffers: AtomicU32::new(0),
47 }
48 }
49
50 pub fn id(&self) -> u64 {
52 self.id
53 }
54
55 pub fn alloc(&self, dims: Vec<NDDimension>, data_type: NDDataType) -> ADResult<NDArray> {
63 let num_elements: usize = dims.iter().map(|d| d.size).product();
64 let needed_bytes = num_elements * data_type.element_size();
65
66 let reused = {
70 let mut free = self.free_list.lock();
71 let mut best_idx = None;
72 let mut best_cap = usize::MAX;
73 for (i, arr) in free.iter().enumerate() {
74 let cap = arr.data.capacity_bytes();
75 if cap >= needed_bytes && cap < best_cap {
76 best_cap = cap;
77 best_idx = Some(i);
78 }
79 }
80 if let Some(idx) = best_idx {
81 if best_cap as f64 > needed_bytes as f64 * THRESHOLD_SIZE_RATIO {
82 let dropped = free.swap_remove(idx);
84 self.num_free_buffers.fetch_sub(1, Ordering::Relaxed);
85 self.allocated_bytes
86 .fetch_sub(dropped.data_size as u64, Ordering::Relaxed);
87 self.num_alloc_buffers.fetch_sub(1, Ordering::Relaxed);
88 None
89 } else {
90 let arr = free.swap_remove(idx);
91 self.num_free_buffers.fetch_sub(1, Ordering::Relaxed);
92 Some(arr)
93 }
94 } else {
95 None
96 }
97 };
98
99 let mut arr = if let Some(mut reused) = reused {
100 let old_size = reused.data_size;
106 if reused.data.data_type() != data_type {
107 reused.data = NDDataBuffer::zeros(data_type, num_elements);
108 } else {
109 reused.data.resize(num_elements);
110 }
111 let effective_size = if needed_bytes > old_size {
112 let diff = (needed_bytes - old_size) as u64;
113 if self.max_memory > 0 {
118 loop {
119 let current = self.allocated_bytes.load(Ordering::Relaxed);
120 if current + diff > self.max_memory as u64 {
121 let mut free = self.free_list.lock();
124 free.push(reused);
125 self.num_free_buffers.fetch_add(1, Ordering::Relaxed);
126 return Err(ADError::PoolExhausted(needed_bytes, self.max_memory));
127 }
128 if self
129 .allocated_bytes
130 .compare_exchange_weak(
131 current,
132 current + diff,
133 Ordering::Relaxed,
134 Ordering::Relaxed,
135 )
136 .is_ok()
137 {
138 break;
139 }
140 }
141 } else {
142 self.allocated_bytes.fetch_add(diff, Ordering::Relaxed);
143 }
144 needed_bytes
145 } else {
146 old_size
149 };
150 reused.data_size = effective_size;
151 reused.dims = dims;
152 reused.attributes.clear();
153 reused.codec = None;
154 reused
155 } else {
156 if self.max_memory > 0 {
159 loop {
160 let current = self.allocated_bytes.load(Ordering::Relaxed);
161 if current + needed_bytes as u64 > self.max_memory as u64 {
162 let mut freed_enough = false;
163 {
164 let mut free = self.free_list.lock();
165 free.sort_by(|a, b| {
166 b.data.capacity_bytes().cmp(&a.data.capacity_bytes())
167 });
168 let mut reclaimed = 0u64;
169 let over = (current + needed_bytes as u64)
170 .saturating_sub(self.max_memory as u64);
171 while !free.is_empty() && reclaimed < over {
172 let dropped = free.remove(0);
173 let dropped_size = dropped.data_size as u64;
174 self.allocated_bytes
175 .fetch_sub(dropped_size, Ordering::Relaxed);
176 self.num_free_buffers.fetch_sub(1, Ordering::Relaxed);
177 self.num_alloc_buffers.fetch_sub(1, Ordering::Relaxed);
178 reclaimed += dropped_size;
179 }
180 if reclaimed >= over {
181 freed_enough = true;
182 }
183 }
184 if !freed_enough {
185 return Err(ADError::PoolExhausted(needed_bytes, self.max_memory));
186 }
187 continue;
188 }
189 if self
190 .allocated_bytes
191 .compare_exchange_weak(
192 current,
193 current + needed_bytes as u64,
194 Ordering::Relaxed,
195 Ordering::Relaxed,
196 )
197 .is_ok()
198 {
199 break;
200 }
201 }
202 } else {
203 self.allocated_bytes
204 .fetch_add(needed_bytes as u64, Ordering::Relaxed);
205 }
206 self.num_alloc_buffers.fetch_add(1, Ordering::Relaxed);
207 NDArray::new(dims, data_type)
208 };
209
210 arr.unique_id = self.next_unique_id.fetch_add(1, Ordering::Relaxed);
211 arr.timestamp = EpicsTimestamp::now();
212 arr.pool_id = self.id;
213 Ok(arr)
216 }
217
218 pub fn alloc_copy(&self, source: &NDArray) -> ADResult<NDArray> {
221 let dims = source.dims.clone();
222 let data_type = source.data.data_type();
223 let mut copy = self.alloc(dims, data_type)?;
224 copy.data = source.data.clone();
225 copy.time_stamp = source.time_stamp;
226 copy.attributes = source.attributes.clone();
227 copy.codec = source.codec.clone();
228 Ok(copy)
229 }
230
231 pub fn release(&self, array: NDArray) {
238 if array.pool_id != self.id {
239 return;
243 }
244
245 let mut free = self.free_list.lock();
246 free.push(array);
247 self.num_free_buffers.fetch_add(1, Ordering::Relaxed);
248
249 let total = self.allocated_bytes.load(Ordering::Relaxed) as usize;
253 if self.max_memory > 0 && total > self.max_memory && !free.is_empty() {
254 free.sort_by(|a, b| b.data_size.cmp(&a.data_size));
255 let mut excess = total - self.max_memory;
256 while excess > 0 && !free.is_empty() {
257 let dropped = free.remove(0);
258 let dropped_size = dropped.data_size;
259 self.allocated_bytes
260 .fetch_sub(dropped_size as u64, Ordering::Relaxed);
261 self.num_free_buffers.fetch_sub(1, Ordering::Relaxed);
262 self.num_alloc_buffers.fetch_sub(1, Ordering::Relaxed);
263 if dropped_size >= excess {
264 break;
265 }
266 excess -= dropped_size;
267 }
268 }
269 }
270
271 pub fn empty_free_list(&self) {
273 let mut free = self.free_list.lock();
274 let count = free.len() as u32;
275 for arr in free.drain(..) {
276 self.allocated_bytes
277 .fetch_sub(arr.data_size as u64, Ordering::Relaxed);
278 self.num_alloc_buffers.fetch_sub(1, Ordering::Relaxed);
279 }
280 self.num_free_buffers.fetch_sub(count, Ordering::Relaxed);
281 }
282
283 pub fn allocated_bytes(&self) -> u64 {
284 self.allocated_bytes.load(Ordering::Relaxed)
285 }
286
287 pub fn num_alloc_buffers(&self) -> u32 {
288 self.num_alloc_buffers.load(Ordering::Relaxed)
289 }
290
291 pub fn num_free_buffers(&self) -> u32 {
292 self.num_free_buffers.load(Ordering::Relaxed)
293 }
294
295 pub fn max_memory(&self) -> usize {
296 self.max_memory
297 }
298
299 pub fn alloc_handle(
302 pool: &Arc<Self>,
303 dims: Vec<NDDimension>,
304 data_type: NDDataType,
305 ) -> ADResult<NDArrayHandle> {
306 let array = pool.alloc(dims, data_type)?;
307 Ok(pooled_array(array, pool))
308 }
309
310 pub fn copy(
322 &self,
323 src: &NDArray,
324 out: Option<NDArray>,
325 copy_data: bool,
326 copy_dimensions: bool,
327 copy_data_type: bool,
328 ) -> ADResult<NDArray> {
329 let mut out = match out {
330 Some(o) => o,
331 None => self.alloc(src.dims.clone(), src.data.data_type())?,
332 };
333
334 out.unique_id = src.unique_id;
335 out.time_stamp = src.time_stamp;
336 out.timestamp = src.timestamp;
337 if copy_dimensions {
338 out.dims = src.dims.clone();
339 }
340 out.codec = src.codec.clone();
341
342 if copy_data {
343 if copy_data_type && out.data.data_type() != src.data.data_type() {
344 out.data = src.data.clone();
346 } else if out.data.data_type() == src.data.data_type() {
347 out.data = src.data.clone();
348 } else {
349 out.data = crate::color::convert_data_type(src, out.data.data_type())?.data;
351 }
352 } else if copy_data_type && out.data.data_type() != src.data.data_type() {
353 out.data = NDDataBuffer::zeros(src.data.data_type(), out.data.len());
354 }
355
356 out.attributes.clear();
357 out.attributes.copy_from(&src.attributes);
358 Ok(out)
359 }
360
361 pub fn pre_allocate_buffers(&self, template_array: &NDArray, count: usize) -> ADResult<()> {
367 let mut buffers = Vec::with_capacity(count);
368 for _ in 0..count {
369 buffers.push(self.copy(template_array, None, true, true, true)?);
370 }
371 for arr in buffers {
372 self.release(arr);
373 }
374 Ok(())
375 }
376
377 pub fn convert_type(&self, src: &NDArray, target_type: NDDataType) -> ADResult<NDArray> {
380 if src.codec.is_some() {
382 return Err(ADError::UnsupportedConversion(
383 "convert_type: cannot convert compressed (codec) data".into(),
384 ));
385 }
386 if src.data.data_type() == target_type {
387 return self.alloc_copy(src);
388 }
389 let mut out = self.alloc(src.dims.clone(), target_type)?;
392 let converted = crate::color::convert_data_type(src, target_type)?;
393 out.data = converted.data;
394 out.time_stamp = src.time_stamp;
395 out.timestamp = src.timestamp;
396 out.attributes.copy_from(&src.attributes);
397 Ok(out)
398 }
399
400 pub fn convert(
411 &self,
412 src: &NDArray,
413 dims_out: &[NDDimension],
414 target_type: NDDataType,
415 ) -> ADResult<NDArray> {
416 if src.codec.is_some() {
418 return Err(ADError::UnsupportedConversion(
419 "convert: cannot convert compressed (codec) data".into(),
420 ));
421 }
422
423 let ndims = src.dims.len();
424 if dims_out.len() != ndims {
425 return Err(ADError::InvalidDimensions(format!(
426 "convert: dims_out length {} != source ndims {}",
427 dims_out.len(),
428 ndims,
429 )));
430 }
431
432 let mut out_sizes = Vec::with_capacity(ndims);
434 for (i, d) in dims_out.iter().enumerate() {
435 let bin = d.binning.max(1);
436 if d.size == 0 {
437 return Err(ADError::InvalidDimensions(format!(
438 "convert: dims_out[{}].size is 0",
439 i,
440 )));
441 }
442 let out_size = d.size / bin;
443 if out_size == 0 {
444 return Err(ADError::InvalidDimensions(format!(
445 "convert: dims_out[{}] size {} / binning {} = 0",
446 i, d.size, bin,
447 )));
448 }
449 if d.offset + d.size > src.dims[i].size {
451 return Err(ADError::InvalidDimensions(format!(
452 "convert: dims_out[{}] offset {} + size {} > src dim size {}",
453 i, d.offset, d.size, src.dims[i].size,
454 )));
455 }
456 out_sizes.push(out_size);
457 }
458
459 let src_type = src.data.data_type();
460
461 let mut out_dims = Vec::with_capacity(ndims);
466 for i in 0..ndims {
467 let bin = dims_out[i].binning.max(1);
468 out_dims.push(NDDimension {
469 size: out_sizes[i],
470 offset: src.dims[i].offset + dims_out[i].offset,
471 binning: src.dims[i].binning * bin,
472 reverse: dims_out[i].reverse ^ src.dims[i].reverse,
473 });
474 }
475
476 let total_out: usize = out_sizes.iter().product();
477
478 let mut src_strides = vec![1usize; ndims];
480 for i in 1..ndims {
481 src_strides[i] = src_strides[i - 1] * src.dims[i - 1].size;
482 }
483
484 let mut out_strides = vec![1usize; ndims];
486 for i in 1..ndims {
487 out_strides[i] = out_strides[i - 1] * out_sizes[i - 1];
488 }
489
490 macro_rules! convert_buf {
492 ($src_vec:expr, $T:ty, $zero:expr, $variant:ident) => {{
493 let mut out = vec![$zero; total_out];
494
495 for out_idx in 0..total_out {
497 let mut remaining = out_idx;
499 let mut out_coords = [0usize; 10]; for i in (0..ndims).rev() {
501 out_coords[i] = remaining / out_strides[i];
502 remaining %= out_strides[i];
503 }
504
505 let mut eff_coords = [0usize; 10];
507 for i in 0..ndims {
508 eff_coords[i] = if dims_out[i].reverse {
509 out_sizes[i] - 1 - out_coords[i]
510 } else {
511 out_coords[i]
512 };
513 }
514
515 let mut sum = 0.0f64;
517 let bin_total: usize = dims_out.iter().map(|d| d.binning.max(1)).product();
518
519 for bin_flat in 0..bin_total {
521 let mut br = bin_flat;
522 let mut src_flat = 0usize;
523 let mut valid = true;
524
525 for i in (0..ndims).rev() {
526 let bin = dims_out[i].binning.max(1);
527 let bin_off = br % bin;
528 br /= bin;
529
530 let src_coord = dims_out[i].offset + eff_coords[i] * bin + bin_off;
531 if src_coord >= src.dims[i].size {
532 valid = false;
533 break;
534 }
535 src_flat += src_coord * src_strides[i];
536 }
537
538 if valid {
539 sum += $src_vec[src_flat] as f64;
540 }
541 }
542
543 out[out_idx] = sum as $T;
544 }
545
546 NDDataBuffer::$variant(out)
547 }};
548 }
549
550 let out_data = match &src.data {
551 NDDataBuffer::I8(v) => convert_buf!(v, i8, 0i8, I8),
552 NDDataBuffer::U8(v) => convert_buf!(v, u8, 0u8, U8),
553 NDDataBuffer::I16(v) => convert_buf!(v, i16, 0i16, I16),
554 NDDataBuffer::U16(v) => convert_buf!(v, u16, 0u16, U16),
555 NDDataBuffer::I32(v) => convert_buf!(v, i32, 0i32, I32),
556 NDDataBuffer::U32(v) => convert_buf!(v, u32, 0u32, U32),
557 NDDataBuffer::I64(v) => convert_buf!(v, i64, 0i64, I64),
558 NDDataBuffer::U64(v) => convert_buf!(v, u64, 0u64, U64),
559 NDDataBuffer::F32(v) => convert_buf!(v, f32, 0.0f32, F32),
560 NDDataBuffer::F64(v) => convert_buf!(v, f64, 0.0f64, F64),
561 };
562
563 let mut arr = self.alloc(out_dims, target_type)?;
567 arr.timestamp = src.timestamp;
568 arr.time_stamp = src.time_stamp;
569 arr.attributes.copy_from(&src.attributes);
570
571 if target_type != src_type {
574 let staging = NDArray::new(arr.dims.clone(), src_type);
575 let mut staging = staging;
576 staging.data = out_data;
577 let converted = crate::color::convert_data_type(&staging, target_type)?;
578 arr.data = converted.data;
579 } else {
580 arr.data = out_data;
581 }
582
583 Ok(arr)
584 }
585
586 pub fn report(&self, details: i32) -> String {
590 let mut out = String::new();
591 out.push('\n');
592 out.push_str("NDArrayPool:\n");
593 out.push_str(&format!(
594 " numBuffers={}, numFree={}\n",
595 self.num_alloc_buffers(),
596 self.num_free_buffers()
597 ));
598 out.push_str(&format!(
599 " memorySize={}, maxMemory={}\n",
600 self.allocated_bytes(),
601 self.max_memory
602 ));
603 if details > 5 {
604 let free = self.free_list.lock();
605 out.push_str(" freeList: (index, dataSize, capacity)\n");
606 for (i, arr) in free.iter().enumerate() {
607 out.push_str(&format!(
608 " {} {} {}\n",
609 i,
610 arr.data_size,
611 arr.data.capacity_bytes()
612 ));
613 }
614 if details > 10 {
615 for arr in free.iter() {
616 out.push_str(&arr.report(details));
617 }
618 }
619 }
620 out
621 }
622}
623
624const _: fn() = || {
626 fn assert_send_sync<T: Send + Sync>() {}
627 assert_send_sync::<NDArrayPool>();
628};
629
630#[cfg(test)]
631mod tests {
632 use super::*;
633
634 #[test]
635 fn test_alloc_auto_id() {
636 let pool = NDArrayPool::new(1_000_000);
637 let a1 = pool
638 .alloc(vec![NDDimension::new(10)], NDDataType::UInt8)
639 .unwrap();
640 let a2 = pool
641 .alloc(vec![NDDimension::new(10)], NDDataType::UInt8)
642 .unwrap();
643 assert_eq!(a1.unique_id, 1);
644 assert_eq!(a2.unique_id, 2);
645 }
646
647 #[test]
648 fn test_alloc_tracks_bytes() {
649 let pool = NDArrayPool::new(1_000_000);
650 let _ = pool
651 .alloc(vec![NDDimension::new(100)], NDDataType::Float64)
652 .unwrap();
653 assert!(pool.allocated_bytes() >= 800);
654 }
655
656 #[test]
657 fn test_alloc_exceeds_max() {
658 let pool = NDArrayPool::new(100);
659 let result = pool.alloc(vec![NDDimension::new(200)], NDDataType::UInt8);
660 assert!(result.is_err());
661 }
662
663 #[test]
664 fn test_alloc_copy_preserves_data() {
665 let pool = NDArrayPool::new(1_000_000);
666 let mut source = pool
667 .alloc(vec![NDDimension::new(4)], NDDataType::UInt8)
668 .unwrap();
669 if let NDDataBuffer::U8(ref mut v) = source.data {
670 v[0] = 1;
671 v[1] = 2;
672 v[2] = 3;
673 v[3] = 4;
674 }
675
676 let copy = pool.alloc_copy(&source).unwrap();
677 assert_ne!(copy.unique_id, source.unique_id);
678 assert_eq!(copy.dims.len(), source.dims.len());
679 if let NDDataBuffer::U8(ref v) = copy.data {
680 assert_eq!(v, &[1, 2, 3, 4]);
681 } else {
682 panic!("wrong type");
683 }
684 }
685
686 #[test]
687 fn test_alloc_copy_tracks_bytes() {
688 let pool = NDArrayPool::new(1_000_000);
689 let source = pool
690 .alloc(vec![NDDimension::new(10)], NDDataType::UInt16)
691 .unwrap();
692 assert_eq!(pool.allocated_bytes(), 20);
693 let _ = pool.alloc_copy(&source).unwrap();
694 assert!(pool.allocated_bytes() >= 40);
695 }
696
697 #[test]
698 fn test_alloc_copy_exceeds_max() {
699 let pool = NDArrayPool::new(60);
700 let source = pool
701 .alloc(vec![NDDimension::new(50)], NDDataType::UInt8)
702 .unwrap();
703 assert!(pool.alloc_copy(&source).is_err());
704 }
705
706 #[test]
709 fn test_release_and_reuse() {
710 let pool = NDArrayPool::new(1_000_000);
711 let arr = pool
712 .alloc(vec![NDDimension::new(100)], NDDataType::UInt8)
713 .unwrap();
714 let _alloc_bytes_after_first = pool.allocated_bytes();
715 assert_eq!(pool.num_alloc_buffers(), 1);
716
717 pool.release(arr);
719 assert_eq!(pool.num_free_buffers(), 1);
720
721 let arr2 = pool
723 .alloc(vec![NDDimension::new(80)], NDDataType::UInt8)
724 .unwrap();
725 assert_eq!(arr2.data.len(), 80);
726 }
727
728 #[test]
729 fn test_free_list_prefers_smallest_sufficient() {
730 let pool = NDArrayPool::new(10_000_000);
731 let small = pool
732 .alloc(vec![NDDimension::new(100)], NDDataType::UInt8)
733 .unwrap();
734 let large = pool
735 .alloc(vec![NDDimension::new(10000)], NDDataType::UInt8)
736 .unwrap();
737 let medium = pool
738 .alloc(vec![NDDimension::new(1000)], NDDataType::UInt8)
739 .unwrap();
740
741 pool.release(large);
742 pool.release(medium);
743 pool.release(small);
744 assert_eq!(pool.num_free_buffers(), 3);
745
746 let reused = pool
748 .alloc(vec![NDDimension::new(900)], NDDataType::UInt8)
749 .unwrap();
750 assert!(reused.data.capacity_bytes() >= 900);
751 }
752
753 #[test]
754 fn test_empty_free_list() {
755 let pool = NDArrayPool::new(1_000_000);
756 let a1 = pool
757 .alloc(vec![NDDimension::new(100)], NDDataType::UInt8)
758 .unwrap();
759 let a2 = pool
760 .alloc(vec![NDDimension::new(200)], NDDataType::UInt8)
761 .unwrap();
762 pool.release(a1);
763 pool.release(a2);
764 assert_eq!(pool.num_free_buffers(), 2);
765
766 pool.empty_free_list();
767 assert_eq!(pool.num_free_buffers(), 0);
768 assert_eq!(pool.num_alloc_buffers(), 0);
769 }
770
771 #[test]
772 fn test_num_free_buffers_tracking() {
773 let pool = NDArrayPool::new(1_000_000);
774 assert_eq!(pool.num_free_buffers(), 0);
775
776 let a = pool
777 .alloc(vec![NDDimension::new(10)], NDDataType::UInt8)
778 .unwrap();
779 assert_eq!(pool.num_free_buffers(), 0);
780
781 pool.release(a);
782 assert_eq!(pool.num_free_buffers(), 1);
783
784 let _ = pool
785 .alloc(vec![NDDimension::new(10)], NDDataType::UInt8)
786 .unwrap();
787 assert_eq!(pool.num_free_buffers(), 0);
788 }
789
790 #[test]
791 fn test_concurrent_alloc_release() {
792 use std::sync::Arc;
793 use std::thread;
794
795 let pool = Arc::new(NDArrayPool::new(10_000_000));
796 let mut handles = Vec::new();
797
798 for _ in 0..4 {
799 let pool = pool.clone();
800 handles.push(thread::spawn(move || {
801 for _ in 0..100 {
802 let arr = pool
803 .alloc(vec![NDDimension::new(100)], NDDataType::UInt8)
804 .unwrap();
805 pool.release(arr);
806 }
807 }));
808 }
809
810 for h in handles {
811 h.join().unwrap();
812 }
813
814 assert!(pool.num_free_buffers() > 0);
816 }
817
818 #[test]
819 fn test_max_memory() {
820 let pool = NDArrayPool::new(42);
821 assert_eq!(pool.max_memory(), 42);
822 }
823
824 #[test]
827 fn test_convert_type_same_type() {
828 let pool = NDArrayPool::new(1_000_000);
829 let mut src = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
830 if let NDDataBuffer::U8(ref mut v) = src.data {
831 v[0] = 10;
832 v[1] = 20;
833 v[2] = 30;
834 v[3] = 40;
835 }
836
837 let out = pool.convert_type(&src, NDDataType::UInt8).unwrap();
838 assert_eq!(out.data.data_type(), NDDataType::UInt8);
839 if let NDDataBuffer::U8(ref v) = out.data {
840 assert_eq!(v, &[10, 20, 30, 40]);
841 } else {
842 panic!("wrong type");
843 }
844 }
845
846 #[test]
847 fn test_convert_type_u8_to_f32() {
848 let pool = NDArrayPool::new(1_000_000);
849 let mut src = NDArray::new(vec![NDDimension::new(3)], NDDataType::UInt8);
850 if let NDDataBuffer::U8(ref mut v) = src.data {
851 v[0] = 0;
852 v[1] = 128;
853 v[2] = 255;
854 }
855
856 let out = pool.convert_type(&src, NDDataType::Float32).unwrap();
857 assert_eq!(out.data.data_type(), NDDataType::Float32);
858 if let NDDataBuffer::F32(ref v) = out.data {
859 assert_eq!(v[0], 0.0);
860 assert_eq!(v[1], 128.0);
861 assert_eq!(v[2], 255.0);
862 } else {
863 panic!("wrong type");
864 }
865 }
866
867 #[test]
868 fn test_convert_type_u16_to_u8() {
869 let pool = NDArrayPool::new(1_000_000);
870 let mut src = NDArray::new(vec![NDDimension::new(2)], NDDataType::UInt16);
871 if let NDDataBuffer::U16(ref mut v) = src.data {
872 v[0] = 100;
873 v[1] = 300; }
875
876 let out = pool.convert_type(&src, NDDataType::UInt8).unwrap();
877 if let NDDataBuffer::U8(ref v) = out.data {
878 assert_eq!(v[0], 100);
879 assert_eq!(v[1], 255); } else {
881 panic!("wrong type");
882 }
883 }
884
885 fn make_4x4_u8() -> NDArray {
889 let mut arr = NDArray::new(
890 vec![NDDimension::new(4), NDDimension::new(4)],
891 NDDataType::UInt8,
892 );
893 if let NDDataBuffer::U8(ref mut v) = arr.data {
894 for i in 0..16 {
895 v[i] = i as u8;
896 }
897 }
898 arr
899 }
900
901 #[test]
902 fn test_convert_identity() {
903 let pool = NDArrayPool::new(1_000_000);
905 let src = make_4x4_u8();
906 let dims_out = vec![
907 NDDimension {
908 size: 4,
909 offset: 0,
910 binning: 1,
911 reverse: false,
912 },
913 NDDimension {
914 size: 4,
915 offset: 0,
916 binning: 1,
917 reverse: false,
918 },
919 ];
920
921 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
922 assert_eq!(out.dims[0].size, 4);
923 assert_eq!(out.dims[1].size, 4);
924 if let NDDataBuffer::U8(ref v) = out.data {
925 for i in 0..16 {
926 assert_eq!(v[i], i as u8);
927 }
928 } else {
929 panic!("wrong type");
930 }
931 }
932
933 #[test]
934 fn test_convert_offset_extraction() {
935 let pool = NDArrayPool::new(1_000_000);
937 let src = make_4x4_u8();
938 let dims_out = vec![
939 NDDimension {
940 size: 2,
941 offset: 1,
942 binning: 1,
943 reverse: false,
944 },
945 NDDimension {
946 size: 2,
947 offset: 1,
948 binning: 1,
949 reverse: false,
950 },
951 ];
952
953 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
954 assert_eq!(out.dims[0].size, 2);
955 assert_eq!(out.dims[1].size, 2);
956 if let NDDataBuffer::U8(ref v) = out.data {
960 assert_eq!(v[0], 5);
961 assert_eq!(v[1], 6);
962 assert_eq!(v[2], 9);
963 assert_eq!(v[3], 10);
964 } else {
965 panic!("wrong type");
966 }
967
968 assert_eq!(out.dims[0].offset, 1); assert_eq!(out.dims[1].offset, 1);
971 }
972
973 #[test]
974 fn test_convert_binning_2x2() {
975 let pool = NDArrayPool::new(1_000_000);
977 let src = make_4x4_u8();
978 let dims_out = vec![
979 NDDimension {
980 size: 4,
981 offset: 0,
982 binning: 2,
983 reverse: false,
984 },
985 NDDimension {
986 size: 4,
987 offset: 0,
988 binning: 2,
989 reverse: false,
990 },
991 ];
992
993 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
994 assert_eq!(out.dims[0].size, 2);
995 assert_eq!(out.dims[1].size, 2);
996 if let NDDataBuffer::U8(ref v) = out.data {
1001 assert_eq!(v[0], 10);
1002 assert_eq!(v[1], 18);
1003 assert_eq!(v[2], 42);
1004 assert_eq!(v[3], 50);
1005 } else {
1006 panic!("wrong type");
1007 }
1008
1009 assert_eq!(out.dims[0].binning, 2); assert_eq!(out.dims[1].binning, 2);
1012 }
1013
1014 #[test]
1015 fn test_convert_reverse_x() {
1016 let pool = NDArrayPool::new(1_000_000);
1018 let mut src = NDArray::new(
1019 vec![NDDimension::new(4), NDDimension::new(1)],
1020 NDDataType::UInt8,
1021 );
1022 if let NDDataBuffer::U8(ref mut v) = src.data {
1023 v[0] = 10;
1024 v[1] = 20;
1025 v[2] = 30;
1026 v[3] = 40;
1027 }
1028
1029 let dims_out = vec![
1030 NDDimension {
1031 size: 4,
1032 offset: 0,
1033 binning: 1,
1034 reverse: true,
1035 },
1036 NDDimension {
1037 size: 1,
1038 offset: 0,
1039 binning: 1,
1040 reverse: false,
1041 },
1042 ];
1043
1044 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
1045 if let NDDataBuffer::U8(ref v) = out.data {
1046 assert_eq!(v[0], 40);
1047 assert_eq!(v[1], 30);
1048 assert_eq!(v[2], 20);
1049 assert_eq!(v[3], 10);
1050 } else {
1051 panic!("wrong type");
1052 }
1053 }
1054
1055 #[test]
1056 fn test_convert_reverse_y() {
1057 let pool = NDArrayPool::new(1_000_000);
1059 let mut src = NDArray::new(
1060 vec![NDDimension::new(2), NDDimension::new(2)],
1061 NDDataType::UInt16,
1062 );
1063 if let NDDataBuffer::U16(ref mut v) = src.data {
1064 v[0] = 1;
1066 v[1] = 2;
1067 v[2] = 3;
1068 v[3] = 4;
1069 }
1070
1071 let dims_out = vec![
1072 NDDimension {
1073 size: 2,
1074 offset: 0,
1075 binning: 1,
1076 reverse: false,
1077 },
1078 NDDimension {
1079 size: 2,
1080 offset: 0,
1081 binning: 1,
1082 reverse: true,
1083 },
1084 ];
1085
1086 let out = pool.convert(&src, &dims_out, NDDataType::UInt16).unwrap();
1087 if let NDDataBuffer::U16(ref v) = out.data {
1088 assert_eq!(v[0], 3);
1090 assert_eq!(v[1], 4);
1091 assert_eq!(v[2], 1);
1092 assert_eq!(v[3], 2);
1093 } else {
1094 panic!("wrong type");
1095 }
1096 }
1097
1098 #[test]
1099 fn test_convert_with_type_change() {
1100 let pool = NDArrayPool::new(1_000_000);
1102 let src = make_4x4_u8();
1103 let dims_out = vec![
1104 NDDimension {
1105 size: 4,
1106 offset: 0,
1107 binning: 2,
1108 reverse: false,
1109 },
1110 NDDimension {
1111 size: 4,
1112 offset: 0,
1113 binning: 2,
1114 reverse: false,
1115 },
1116 ];
1117
1118 let out = pool.convert(&src, &dims_out, NDDataType::Float32).unwrap();
1119 assert_eq!(out.data.data_type(), NDDataType::Float32);
1120 assert_eq!(out.dims[0].size, 2);
1121 assert_eq!(out.dims[1].size, 2);
1122 if let NDDataBuffer::F32(ref v) = out.data {
1123 assert_eq!(v[0], 10.0); assert_eq!(v[1], 18.0); } else {
1126 panic!("wrong type");
1127 }
1128 }
1129
1130 #[test]
1131 fn test_convert_cumulative_offset_and_binning() {
1132 let pool = NDArrayPool::new(1_000_000);
1134 let mut src = NDArray::new(
1135 vec![NDDimension::new(4), NDDimension::new(4)],
1136 NDDataType::UInt8,
1137 );
1138 src.dims[0].offset = 10;
1139 src.dims[0].binning = 2;
1140 src.dims[1].offset = 20;
1141 src.dims[1].binning = 3;
1142 if let NDDataBuffer::U8(ref mut v) = src.data {
1143 for i in 0..16 {
1144 v[i] = i as u8;
1145 }
1146 }
1147
1148 let dims_out = vec![
1149 NDDimension {
1150 size: 2,
1151 offset: 1,
1152 binning: 2,
1153 reverse: false,
1154 },
1155 NDDimension {
1156 size: 2,
1157 offset: 1,
1158 binning: 2,
1159 reverse: false,
1160 },
1161 ];
1162
1163 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
1164 assert_eq!(out.dims[0].offset, 10 + 1);
1166 assert_eq!(out.dims[1].offset, 20 + 1);
1167 assert_eq!(out.dims[0].binning, 2 * 2);
1169 assert_eq!(out.dims[1].binning, 3 * 2);
1170 }
1171
1172 #[test]
1173 fn test_convert_1d() {
1174 let pool = NDArrayPool::new(1_000_000);
1176 let mut src = NDArray::new(vec![NDDimension::new(8)], NDDataType::UInt16);
1177 if let NDDataBuffer::U16(ref mut v) = src.data {
1178 for i in 0..8 {
1179 v[i] = (i * 10) as u16;
1180 }
1181 }
1183
1184 let dims_out = vec![NDDimension {
1185 size: 4,
1186 offset: 2,
1187 binning: 2,
1188 reverse: false,
1189 }];
1190
1191 let out = pool.convert(&src, &dims_out, NDDataType::UInt16).unwrap();
1192 assert_eq!(out.dims.len(), 1);
1193 assert_eq!(out.dims[0].size, 2);
1194 if let NDDataBuffer::U16(ref v) = out.data {
1195 assert_eq!(v[0], 50);
1198 assert_eq!(v[1], 90);
1199 } else {
1200 panic!("wrong type");
1201 }
1202 }
1203
1204 #[test]
1205 fn test_convert_3d() {
1206 let pool = NDArrayPool::new(1_000_000);
1208 let mut src = NDArray::new(
1209 vec![
1210 NDDimension::new(2),
1211 NDDimension::new(2),
1212 NDDimension::new(2),
1213 ],
1214 NDDataType::UInt8,
1215 );
1216 if let NDDataBuffer::U8(ref mut v) = src.data {
1217 for i in 0..8 {
1218 v[i] = (i + 1) as u8;
1219 }
1220 }
1221
1222 let dims_out = vec![
1223 NDDimension {
1224 size: 2,
1225 offset: 0,
1226 binning: 1,
1227 reverse: false,
1228 },
1229 NDDimension {
1230 size: 2,
1231 offset: 0,
1232 binning: 1,
1233 reverse: false,
1234 },
1235 NDDimension {
1236 size: 2,
1237 offset: 0,
1238 binning: 1,
1239 reverse: false,
1240 },
1241 ];
1242
1243 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
1244 if let NDDataBuffer::U8(ref v) = out.data {
1245 for i in 0..8 {
1246 assert_eq!(v[i], (i + 1) as u8);
1247 }
1248 } else {
1249 panic!("wrong type");
1250 }
1251 }
1252
1253 #[test]
1254 fn test_convert_dim_mismatch_error() {
1255 let pool = NDArrayPool::new(1_000_000);
1256 let src = make_4x4_u8();
1257 let dims_out = vec![NDDimension {
1259 size: 4,
1260 offset: 0,
1261 binning: 1,
1262 reverse: false,
1263 }];
1264
1265 let result = pool.convert(&src, &dims_out, NDDataType::UInt8);
1266 assert!(result.is_err());
1267 }
1268
1269 #[test]
1270 fn test_convert_offset_out_of_bounds_error() {
1271 let pool = NDArrayPool::new(1_000_000);
1272 let src = make_4x4_u8();
1273 let dims_out = vec![
1274 NDDimension {
1275 size: 4,
1276 offset: 2,
1277 binning: 1,
1278 reverse: false,
1279 }, NDDimension {
1281 size: 4,
1282 offset: 0,
1283 binning: 1,
1284 reverse: false,
1285 },
1286 ];
1287
1288 let result = pool.convert(&src, &dims_out, NDDataType::UInt8);
1289 assert!(result.is_err());
1290 }
1291
1292 #[test]
1293 fn test_convert_preserves_metadata() {
1294 let pool = NDArrayPool::new(1_000_000);
1295 let mut src = make_4x4_u8();
1296 src.time_stamp = 12345.678;
1297
1298 let dims_out = vec![
1299 NDDimension {
1300 size: 4,
1301 offset: 0,
1302 binning: 1,
1303 reverse: false,
1304 },
1305 NDDimension {
1306 size: 4,
1307 offset: 0,
1308 binning: 1,
1309 reverse: false,
1310 },
1311 ];
1312
1313 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
1314 assert_eq!(out.time_stamp, 12345.678);
1315 }
1316
1317 #[test]
1318 fn test_convert_binning_and_reverse_combined() {
1319 let pool = NDArrayPool::new(1_000_000);
1321 let mut src = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt16);
1322 if let NDDataBuffer::U16(ref mut v) = src.data {
1323 v[0] = 1;
1324 v[1] = 2;
1325 v[2] = 3;
1326 v[3] = 4;
1327 }
1328
1329 let dims_out = vec![NDDimension {
1330 size: 4,
1331 offset: 0,
1332 binning: 2,
1333 reverse: true,
1334 }];
1335
1336 let out = pool.convert(&src, &dims_out, NDDataType::UInt16).unwrap();
1337 assert_eq!(out.dims[0].size, 2);
1338 if let NDDataBuffer::U16(ref v) = out.data {
1339 assert_eq!(v[0], 7);
1344 assert_eq!(v[1], 3);
1345 } else {
1346 panic!("wrong type");
1347 }
1348 }
1349
1350 #[test]
1354 fn test_convert_reverse_flag_cumulative() {
1355 let pool = NDArrayPool::new(1_000_000);
1356 let mut src = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1357 src.dims[0].reverse = true; let dims_out = vec![NDDimension {
1361 size: 4,
1362 offset: 0,
1363 binning: 1,
1364 reverse: true,
1365 }];
1366 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
1367 assert!(!out.dims[0].reverse, "true XOR true must be false");
1368
1369 let dims_out2 = vec![NDDimension {
1371 size: 4,
1372 offset: 0,
1373 binning: 1,
1374 reverse: false,
1375 }];
1376 let out2 = pool.convert(&src, &dims_out2, NDDataType::UInt8).unwrap();
1377 assert!(out2.dims[0].reverse, "false XOR true must be true");
1378 }
1379
1380 #[test]
1382 fn test_alloc_tracks_exact_data_size() {
1383 let pool = NDArrayPool::new(0); let a = pool
1385 .alloc(vec![NDDimension::new(333)], NDDataType::UInt16)
1386 .unwrap();
1387 assert_eq!(a.data_size, 666);
1389 assert_eq!(pool.allocated_bytes(), 666);
1390 }
1391
1392 #[test]
1394 fn test_alloc_strict_max_memory_enforcement() {
1395 let pool = NDArrayPool::new(1000);
1396 let _a = pool
1398 .alloc(vec![NDDimension::new(600)], NDDataType::UInt8)
1399 .unwrap();
1400 assert_eq!(pool.allocated_bytes(), 600);
1401 let r = pool.alloc(vec![NDDimension::new(500)], NDDataType::UInt8);
1403 assert!(r.is_err());
1404 assert!(pool.allocated_bytes() <= 1000);
1405 }
1406
1407 #[test]
1409 fn test_convert_output_is_pool_tracked() {
1410 let pool = NDArrayPool::new(1_000_000);
1411 let src = make_4x4_u8();
1412 let before_alloc = pool.num_alloc_buffers();
1413 let dims_out = vec![
1414 NDDimension {
1415 size: 4,
1416 offset: 0,
1417 binning: 1,
1418 reverse: false,
1419 },
1420 NDDimension {
1421 size: 4,
1422 offset: 0,
1423 binning: 1,
1424 reverse: false,
1425 },
1426 ];
1427 let out = pool.convert(&src, &dims_out, NDDataType::UInt8).unwrap();
1428 assert_eq!(out.pool_id, pool.id());
1429 assert_eq!(out.data_size, 16);
1430 assert_eq!(pool.num_alloc_buffers(), before_alloc + 1);
1432 let bytes_with_out = pool.allocated_bytes();
1433 assert_eq!(bytes_with_out, 16);
1434 pool.release(out);
1436 assert_eq!(pool.num_free_buffers(), 1);
1437 assert_eq!(pool.allocated_bytes(), 16);
1438 }
1439
1440 #[test]
1442 fn test_convert_rejects_compressed_input() {
1443 let pool = NDArrayPool::new(1_000_000);
1444 let mut src = make_4x4_u8();
1445 src.codec = Some(crate::codec::Codec {
1446 name: crate::codec::CodecName::LZ4,
1447 compressed_size: 4,
1448 level: 0,
1449 shuffle: 0,
1450 compressor: 0,
1451 });
1452 let dims_out = vec![
1453 NDDimension {
1454 size: 4,
1455 offset: 0,
1456 binning: 1,
1457 reverse: false,
1458 },
1459 NDDimension {
1460 size: 4,
1461 offset: 0,
1462 binning: 1,
1463 reverse: false,
1464 },
1465 ];
1466 assert!(pool.convert(&src, &dims_out, NDDataType::UInt8).is_err());
1467 assert!(pool.convert_type(&src, NDDataType::UInt16).is_err());
1468 }
1469
1470 #[test]
1472 fn test_release_foreign_array_rejected() {
1473 let pool_a = NDArrayPool::new(1_000_000);
1474 let pool_b = NDArrayPool::new(1_000_000);
1475 let arr = pool_a
1476 .alloc(vec![NDDimension::new(100)], NDDataType::UInt8)
1477 .unwrap();
1478 let bytes_b_before = pool_b.allocated_bytes();
1479 let free_b_before = pool_b.num_free_buffers();
1480 pool_b.release(arr);
1482 assert_eq!(pool_b.allocated_bytes(), bytes_b_before);
1483 assert_eq!(pool_b.num_free_buffers(), free_b_before);
1484 }
1485
1486 #[test]
1488 fn test_release_non_pool_array_rejected() {
1489 let pool = NDArrayPool::new(1_000_000);
1490 let arr = NDArray::new(vec![NDDimension::new(10)], NDDataType::UInt8);
1491 assert_eq!(arr.pool_id, 0);
1492 pool.release(arr);
1493 assert_eq!(pool.num_free_buffers(), 0);
1494 assert_eq!(pool.allocated_bytes(), 0);
1495 }
1496
1497 #[test]
1499 fn test_copy_allocates_and_copies() {
1500 let pool = NDArrayPool::new(1_000_000);
1501 let mut src = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1502 if let NDDataBuffer::U8(ref mut v) = src.data {
1503 v.copy_from_slice(&[9, 8, 7, 6]);
1504 }
1505 let out = pool.copy(&src, None, true, true, true).unwrap();
1506 assert_eq!(out.pool_id, pool.id());
1507 assert_eq!(out.dims.len(), 1);
1508 if let NDDataBuffer::U8(ref v) = out.data {
1509 assert_eq!(v, &[9, 8, 7, 6]);
1510 } else {
1511 panic!("wrong type");
1512 }
1513 }
1514
1515 #[test]
1517 fn test_pre_allocate_buffers_warms_free_list() {
1518 let pool = NDArrayPool::new(10_000_000);
1519 let template = pool
1520 .alloc(vec![NDDimension::new(256)], NDDataType::UInt16)
1521 .unwrap();
1522 pool.pre_allocate_buffers(&template, 3).unwrap();
1523 assert_eq!(pool.num_free_buffers(), 3);
1524 }
1525
1526 #[test]
1537 fn test_concurrent_reuse_grow_does_not_overshoot_max_memory() {
1538 use std::sync::Arc;
1539 use std::sync::atomic::AtomicBool;
1540 use std::thread;
1541
1542 const N: usize = 16;
1543 for _ in 0..50 {
1545 let pool = Arc::new(NDArrayPool::new(2000));
1552 let mut warm = Vec::with_capacity(N);
1560 for _ in 0..N {
1561 let mut a = pool
1562 .alloc(vec![NDDimension::new(100)], NDDataType::UInt8)
1563 .unwrap();
1564 if let NDDataBuffer::U8(ref mut v) = a.data {
1567 let mut big = Vec::with_capacity(200);
1568 big.resize(100, 0u8);
1569 *v = big;
1570 }
1571 assert_eq!(a.data_size, 100);
1572 assert!(a.data.capacity_bytes() >= 200);
1573 warm.push(a);
1574 }
1575 for a in warm {
1576 pool.release(a);
1577 }
1578 assert_eq!(pool.allocated_bytes(), 1600);
1579 assert_eq!(pool.num_free_buffers(), N as u32);
1580
1581 let overshoot = Arc::new(AtomicBool::new(false));
1582 let mut handles = Vec::new();
1583 for _ in 0..N {
1584 let pool = pool.clone();
1585 let overshoot = overshoot.clone();
1586 handles.push(thread::spawn(move || {
1587 let res = pool.alloc(vec![NDDimension::new(200)], NDDataType::UInt8);
1589 if res.is_ok() && pool.allocated_bytes() > pool.max_memory() as u64 {
1590 overshoot.store(true, Ordering::Relaxed);
1591 }
1592 }));
1593 }
1594 for h in handles {
1595 h.join().unwrap();
1596 }
1597
1598 assert!(
1599 !overshoot.load(Ordering::Relaxed),
1600 "allocated_bytes overshot max_memory during concurrent reuse-grow"
1601 );
1602 assert!(
1603 pool.allocated_bytes() <= pool.max_memory() as u64,
1604 "final allocated_bytes {} > max_memory {}",
1605 pool.allocated_bytes(),
1606 pool.max_memory()
1607 );
1608 }
1609 }
1610
1611 #[test]
1613 fn test_pool_report_nonempty() {
1614 let pool = NDArrayPool::new(1_000_000);
1615 let _ = pool
1616 .alloc(vec![NDDimension::new(10)], NDDataType::UInt8)
1617 .unwrap();
1618 let r = pool.report(10);
1619 assert!(r.contains("NDArrayPool"));
1620 assert!(r.contains("numBuffers"));
1621 }
1622}