Skip to main content

ad_plugins_rs/
std_arrays.rs

1use std::sync::Arc;
2
3use ad_core_rs::ndarray::NDArray;
4use ad_core_rs::ndarray_pool::NDArrayPool;
5use ad_core_rs::plugin::runtime::{NDPluginProcess, PluginRuntimeHandle, ProcessResult};
6use ad_core_rs::plugin::wiring::WiringRegistry;
7use parking_lot::Mutex;
8
9/// Pure processing logic: stores the latest array and passes it through.
10pub struct StdArraysProcessor {
11    latest_data: Arc<Mutex<Option<Arc<NDArray>>>>,
12}
13
14impl StdArraysProcessor {
15    pub fn new() -> Self {
16        Self {
17            latest_data: Arc::new(Mutex::new(None)),
18        }
19    }
20
21    /// Get a cloneable handle to the latest array.
22    pub fn data_handle(&self) -> Arc<Mutex<Option<Arc<NDArray>>>> {
23        self.latest_data.clone()
24    }
25}
26
27impl Default for StdArraysProcessor {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl NDPluginProcess for StdArraysProcessor {
34    fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
35        let out = Arc::new(array.clone());
36        *self.latest_data.lock() = Some(out.clone());
37        ProcessResult::arrays(vec![out])
38    }
39
40    fn plugin_type(&self) -> &str {
41        "NDPluginStdArrays"
42    }
43
44    fn array_data_handle(&self) -> Option<Arc<Mutex<Option<Arc<NDArray>>>>> {
45        Some(self.latest_data.clone())
46    }
47}
48
49/// Create a StdArrays plugin runtime.
50pub fn create_std_arrays_runtime(
51    port_name: &str,
52    pool: Arc<NDArrayPool>,
53    ndarray_port: &str,
54    wiring: Arc<WiringRegistry>,
55) -> (
56    PluginRuntimeHandle,
57    Arc<Mutex<Option<Arc<NDArray>>>>,
58    std::thread::JoinHandle<()>,
59) {
60    let processor = StdArraysProcessor::new();
61    let data_handle = processor.data_handle();
62
63    let (handle, data_jh) = ad_core_rs::plugin::runtime::create_plugin_runtime(
64        port_name,
65        processor,
66        pool,
67        1, // LatestOnly semantics
68        ndarray_port,
69        wiring,
70    );
71
72    (handle, data_handle, data_jh)
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use ad_core_rs::ndarray::{NDDataType, NDDimension};
79
80    fn make_array(id: i32) -> Arc<NDArray> {
81        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
82        arr.unique_id = id;
83        Arc::new(arr)
84    }
85
86    #[test]
87    fn test_processor_stores_and_passes_through() {
88        let mut proc = StdArraysProcessor::new();
89        let pool = NDArrayPool::new(1_000_000);
90
91        let arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
92        let result = proc.process_array(&arr, &pool);
93        assert_eq!(result.output_arrays.len(), 1);
94
95        let latest = proc.data_handle().lock().clone();
96        assert!(latest.is_some());
97    }
98
99    #[test]
100    fn test_std_arrays_runtime() {
101        let pool = Arc::new(NDArrayPool::new(1_000_000));
102        let wiring = Arc::new(WiringRegistry::new());
103        let (handle, data, _jh) = create_std_arrays_runtime("IMAGE1", pool, "", wiring);
104
105        // Plugins default to disabled — enable for test
106        handle
107            .port_runtime()
108            .port_handle()
109            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
110            .unwrap();
111        std::thread::sleep(std::time::Duration::from_millis(10));
112
113        handle.array_sender().send(make_array(42));
114        std::thread::sleep(std::time::Duration::from_millis(100));
115
116        let latest = data.lock().clone();
117        assert!(latest.is_some());
118        assert_eq!(latest.unwrap().unique_id, 42);
119    }
120}