ad_plugins_rs/
std_arrays.rs1use std::sync::Arc;
2
3use ad_core_rs::ndarray::NDArray;
4use ad_core_rs::ndarray_pool::NDArrayPool;
5use ad_core_rs::plugin::runtime::{NDPluginProcess, PluginRuntimeHandle, ProcessResult};
6use ad_core_rs::plugin::wiring::WiringRegistry;
7use parking_lot::Mutex;
8
9pub struct StdArraysProcessor {
11 latest_data: Arc<Mutex<Option<Arc<NDArray>>>>,
12}
13
14impl StdArraysProcessor {
15 pub fn new() -> Self {
16 Self {
17 latest_data: Arc::new(Mutex::new(None)),
18 }
19 }
20
21 pub fn data_handle(&self) -> Arc<Mutex<Option<Arc<NDArray>>>> {
23 self.latest_data.clone()
24 }
25}
26
27impl Default for StdArraysProcessor {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl NDPluginProcess for StdArraysProcessor {
34 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
35 let out = Arc::new(array.clone());
36 *self.latest_data.lock() = Some(out.clone());
37 ProcessResult::arrays(vec![out])
38 }
39
40 fn plugin_type(&self) -> &str {
41 "NDPluginStdArrays"
42 }
43
44 fn array_data_handle(&self) -> Option<Arc<Mutex<Option<Arc<NDArray>>>>> {
45 Some(self.latest_data.clone())
46 }
47}
48
49pub fn create_std_arrays_runtime(
51 port_name: &str,
52 pool: Arc<NDArrayPool>,
53 ndarray_port: &str,
54 wiring: Arc<WiringRegistry>,
55) -> (
56 PluginRuntimeHandle,
57 Arc<Mutex<Option<Arc<NDArray>>>>,
58 std::thread::JoinHandle<()>,
59) {
60 let processor = StdArraysProcessor::new();
61 let data_handle = processor.data_handle();
62
63 let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime(
64 port_name,
65 processor,
66 pool,
67 1, ndarray_port,
69 wiring,
70 );
71
72 (handle, data_handle, data_jh)
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use ad_core_rs::ndarray::{NDDataType, NDDimension};
79
80 fn make_array(id: i32) -> Arc<NDArray> {
81 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
82 arr.unique_id = id;
83 Arc::new(arr)
84 }
85
86 #[test]
87 fn test_processor_stores_and_passes_through() {
88 let mut proc = StdArraysProcessor::new();
89 let pool = NDArrayPool::new(1_000_000);
90
91 let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
92 let result = proc.process_array(&arr, &pool);
93 assert_eq!(result.output_arrays.len(), 1);
94
95 let latest = proc.data_handle().lock().clone();
96 assert!(latest.is_some());
97 }
98
99 #[test]
100 fn test_std_arrays_runtime() {
101 let pool = Arc::new(NDArrayPool::new(1_000_000));
102 let wiring = Arc::new(WiringRegistry::new());
103 let (handle, data, _jh) = create_std_arrays_runtime("IMAGE1", pool, "", wiring);
104
105 handle
107 .port_runtime()
108 .port_handle()
109 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
110 .unwrap();
111 std::thread::sleep(std::time::Duration::from_millis(10));
112
113 handle.array_sender().send(make_array(42));
114 std::thread::sleep(std::time::Duration::from_millis(100));
115
116 let latest = data.lock().clone();
117 assert!(latest.is_some());
118 assert_eq!(latest.unwrap().unique_id, 42);
119 }
120}