Skip to main content

ad_plugins/
std_arrays.rs

1use std::sync::Arc;
2
3use ad_core::ndarray::NDArray;
4use ad_core::ndarray_pool::NDArrayPool;
5use ad_core::plugin::runtime::{NDPluginProcess, PluginRuntimeHandle, ProcessResult};
6use ad_core::plugin::wiring::WiringRegistry;
7use parking_lot::Mutex;
8
9/// Pure processing logic: stores the latest array and passes it through.
10pub struct StdArraysProcessor {
11    latest_data: Arc<Mutex<Option<Arc<NDArray>>>>,
12}
13
14impl StdArraysProcessor {
15    pub fn new() -> Self {
16        Self {
17            latest_data: Arc::new(Mutex::new(None)),
18        }
19    }
20
21    /// Get a cloneable handle to the latest array.
22    pub fn data_handle(&self) -> Arc<Mutex<Option<Arc<NDArray>>>> {
23        self.latest_data.clone()
24    }
25}
26
27impl Default for StdArraysProcessor {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl NDPluginProcess for StdArraysProcessor {
34    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
35        let out = Arc::new(array.clone());
36        *self.latest_data.lock() = Some(out.clone());
37        ProcessResult::arrays(vec![out])
38    }
39
40    fn plugin_type(&self) -> &str {
41        "NDPluginStdArrays"
42    }
43}
44
45/// Create a StdArrays plugin runtime.
46pub fn create_std_arrays_runtime(
47    port_name: &str,
48    pool: Arc<NDArrayPool>,
49    ndarray_port: &str,
50    wiring: Arc<WiringRegistry>,
51) -> (PluginRuntimeHandle, Arc<Mutex<Option<Arc<NDArray>>>>, std::thread::JoinHandle<()>) {
52    let processor = StdArraysProcessor::new();
53    let data_handle = processor.data_handle();
54
55    let (handle, data_jh) = ad_core::plugin::runtime::create_plugin_runtime(
56        port_name,
57        processor,
58        pool,
59        1, // LatestOnly semantics
60        ndarray_port,
61        wiring,
62    );
63
64    (handle, data_handle, data_jh)
65}
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70    use ad_core::ndarray::{NDDataType, NDDimension};
71
72    fn make_array(id: i32) -> Arc<NDArray> {
73        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
74        arr.unique_id = id;
75        Arc::new(arr)
76    }
77
78    #[test]
79    fn test_processor_stores_and_passes_through() {
80        let mut proc = StdArraysProcessor::new();
81        let pool = NDArrayPool::new(1_000_000);
82
83        let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
84        let result = proc.process_array(&arr, &pool);
85        assert_eq!(result.output_arrays.len(), 1);
86
87        let latest = proc.data_handle().lock().clone();
88        assert!(latest.is_some());
89    }
90
91    #[test]
92    fn test_std_arrays_runtime() {
93        let pool = Arc::new(NDArrayPool::new(1_000_000));
94        let wiring = Arc::new(WiringRegistry::new());
95        let (handle, data, _jh) = create_std_arrays_runtime("IMAGE1", pool, "", wiring);
96
97        // Plugins default to disabled — enable for test
98        handle.port_runtime().port_handle()
99            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1).unwrap();
100        std::thread::sleep(std::time::Duration::from_millis(10));
101
102        handle.array_sender().send(make_array(42));
103        std::thread::sleep(std::time::Duration::from_millis(100));
104
105        let latest = data.lock().clone();
106        assert!(latest.is_some());
107        assert_eq!(latest.unwrap().unique_id, 42);
108    }
109}