1use std::sync::Arc;
2
3use ad_core::ndarray::NDArray;
4use ad_core::ndarray_pool::NDArrayPool;
5use ad_core::plugin::runtime::{NDPluginProcess, PluginRuntimeHandle, ProcessResult};
6use ad_core::plugin::wiring::WiringRegistry;
7use parking_lot::Mutex;
8
9pub struct StdArraysProcessor {
11 latest_data: Arc<Mutex<Option<Arc<NDArray>>>>,
12}
13
14impl StdArraysProcessor {
15 pub fn new() -> Self {
16 Self {
17 latest_data: Arc::new(Mutex::new(None)),
18 }
19 }
20
21 pub fn data_handle(&self) -> Arc<Mutex<Option<Arc<NDArray>>>> {
23 self.latest_data.clone()
24 }
25}
26
27impl Default for StdArraysProcessor {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl NDPluginProcess for StdArraysProcessor {
34 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
35 let out = Arc::new(array.clone());
36 *self.latest_data.lock() = Some(out.clone());
37 ProcessResult::arrays(vec![out])
38 }
39
40 fn plugin_type(&self) -> &str {
41 "NDPluginStdArrays"
42 }
43}
44
45pub fn create_std_arrays_runtime(
47 port_name: &str,
48 pool: Arc<NDArrayPool>,
49 ndarray_port: &str,
50 wiring: Arc<WiringRegistry>,
51) -> (PluginRuntimeHandle, Arc<Mutex<Option<Arc<NDArray>>>>, std::thread::JoinHandle<()>) {
52 let processor = StdArraysProcessor::new();
53 let data_handle = processor.data_handle();
54
55 let (handle, data_jh) = ad_core::plugin::runtime::create_plugin_runtime(
56 port_name,
57 processor,
58 pool,
59 1, ndarray_port,
61 wiring,
62 );
63
64 (handle, data_handle, data_jh)
65}
66
67#[cfg(test)]
68mod tests {
69 use super::*;
70 use ad_core::ndarray::{NDDataType, NDDimension};
71
72 fn make_array(id: i32) -> Arc<NDArray> {
73 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
74 arr.unique_id = id;
75 Arc::new(arr)
76 }
77
78 #[test]
79 fn test_processor_stores_and_passes_through() {
80 let mut proc = StdArraysProcessor::new();
81 let pool = NDArrayPool::new(1_000_000);
82
83 let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
84 let result = proc.process_array(&arr, &pool);
85 assert_eq!(result.output_arrays.len(), 1);
86
87 let latest = proc.data_handle().lock().clone();
88 assert!(latest.is_some());
89 }
90
91 #[test]
92 fn test_std_arrays_runtime() {
93 let pool = Arc::new(NDArrayPool::new(1_000_000));
94 let wiring = Arc::new(WiringRegistry::new());
95 let (handle, data, _jh) = create_std_arrays_runtime("IMAGE1", pool, "", wiring);
96
97 handle.port_runtime().port_handle()
99 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1).unwrap();
100 std::thread::sleep(std::time::Duration::from_millis(10));
101
102 handle.array_sender().send(make_array(42));
103 std::thread::sleep(std::time::Duration::from_millis(100));
104
105 let latest = data.lock().clone();
106 assert!(latest.is_some());
107 assert_eq!(latest.unwrap().unique_id, 42);
108 }
109}