Skip to main content

ad_core/plugin/
runtime.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::thread;
4
5use asyn_rs::error::AsynResult;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::runtime::config::RuntimeConfig;
8use asyn_rs::runtime::port::{create_port_runtime, PortRuntimeHandle};
9use asyn_rs::user::AsynUser;
10
11use asyn_rs::port_handle::PortHandle;
12
13use crate::ndarray::NDArray;
14use crate::ndarray_pool::NDArrayPool;
15use crate::params::ndarray_driver::NDArrayDriverParams;
16
17use super::channel::{ndarray_channel, BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender};
18use super::params::PluginBaseParams;
19use super::wiring::WiringRegistry;
20
21/// Value sent through the param change channel from control plane to data plane.
22#[derive(Debug, Clone)]
23pub enum ParamChangeValue {
24    Int32(i32),
25    Float64(f64),
26    Octet(String),
27}
28
29impl ParamChangeValue {
30    pub fn as_i32(&self) -> i32 {
31        match self {
32            ParamChangeValue::Int32(v) => *v,
33            ParamChangeValue::Float64(v) => *v as i32,
34            ParamChangeValue::Octet(_) => 0,
35        }
36    }
37
38    pub fn as_f64(&self) -> f64 {
39        match self {
40            ParamChangeValue::Int32(v) => *v as f64,
41            ParamChangeValue::Float64(v) => *v,
42            ParamChangeValue::Octet(_) => 0.0,
43        }
44    }
45
46    pub fn as_string(&self) -> Option<&str> {
47        match self {
48            ParamChangeValue::Octet(s) => Some(s),
49            _ => None,
50        }
51    }
52}
53
54/// A single parameter update produced by a plugin's process_array.
55pub enum ParamUpdate {
56    Int32 { reason: usize, addr: i32, value: i32 },
57    Float64 { reason: usize, addr: i32, value: f64 },
58}
59
60impl ParamUpdate {
61    /// Create an Int32 update at addr 0.
62    pub fn int32(reason: usize, value: i32) -> Self {
63        Self::Int32 { reason, addr: 0, value }
64    }
65    /// Create a Float64 update at addr 0.
66    pub fn float64(reason: usize, value: f64) -> Self {
67        Self::Float64 { reason, addr: 0, value }
68    }
69    /// Create an Int32 update at a specific addr.
70    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
71        Self::Int32 { reason, addr, value }
72    }
73    /// Create a Float64 update at a specific addr.
74    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
75        Self::Float64 { reason, addr, value }
76    }
77}
78
79/// Result of processing one array: output arrays + param updates to write back.
80pub struct ProcessResult {
81    pub output_arrays: Vec<Arc<NDArray>>,
82    pub param_updates: Vec<ParamUpdate>,
83}
84
85impl ProcessResult {
86    /// Convenience: sink plugin with only param updates, no output arrays.
87    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
88        Self { output_arrays: vec![], param_updates }
89    }
90
91    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
92    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
93        Self { output_arrays, param_updates: vec![] }
94    }
95
96    /// Convenience: no outputs, no param updates.
97    pub fn empty() -> Self {
98        Self { output_arrays: vec![], param_updates: vec![] }
99    }
100}
101
102/// Pure processing logic. No threading concerns.
103pub trait NDPluginProcess: Send + 'static {
104    /// Process one array. Return output arrays and param updates.
105    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
106
107    /// Plugin type name for PLUGIN_TYPE param.
108    fn plugin_type(&self) -> &str;
109
110    /// Register plugin-specific params on the base. Called once during construction.
111    fn register_params(&mut self, _base: &mut PortDriverBase) -> Result<(), asyn_rs::error::AsynError> {
112        Ok(())
113    }
114
115    /// Called when a param changes. Reason is the param index.
116    fn on_param_change(&mut self, _reason: usize, _params: &PluginParamSnapshot) {}
117}
118
119/// Read-only snapshot of param values available to the processing thread.
120pub struct PluginParamSnapshot {
121    pub enable_callbacks: bool,
122    /// The param reason that changed.
123    pub reason: usize,
124    /// The address (sub-device) that changed.
125    pub addr: i32,
126    /// The new value.
127    pub value: ParamChangeValue,
128}
129
130/// Shared processor state protected by a mutex, accessible from both
131/// the data thread (non-blocking mode) and the caller thread (blocking mode).
132struct SharedProcessorInner<P: NDPluginProcess> {
133    processor: P,
134    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
135    pool: Arc<NDArrayPool>,
136    ndarray_params: NDArrayDriverParams,
137    plugin_params: PluginBaseParams,
138    port_handle: PortHandle,
139    array_counter: i32,
140}
141
142impl<P: NDPluginProcess> SharedProcessorInner<P> {
143    fn process_and_publish(&mut self, array: &NDArray) {
144        let t0 = std::time::Instant::now();
145        let result = self.processor.process_array(array, &self.pool);
146        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
147
148        // Publish output arrays to downstream plugins
149        let output = self.output.lock();
150        for out in &result.output_arrays {
151            output.publish(out.clone());
152        }
153        drop(output);
154
155        // Update base NDArrayDriver params from output array metadata.
156        // Use the first output array if available (reflects ROI/binning/transform),
157        // otherwise fall back to the input array (for sink plugins like Stats).
158        self.array_counter += 1;
159        let report_arr = result.output_arrays.first().map(|a| a.as_ref()).unwrap_or(array);
160        let info = report_arr.info();
161        let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
162        self.port_handle.write_int32_no_wait(self.ndarray_params.array_counter, 0, self.array_counter);
163        self.port_handle.write_int32_no_wait(self.ndarray_params.unique_id, 0, report_arr.unique_id);
164        self.port_handle.write_int32_no_wait(self.ndarray_params.n_dimensions, 0, report_arr.dims.len() as i32);
165        self.port_handle.write_int32_no_wait(self.ndarray_params.array_size_x, 0, info.x_size as i32);
166        self.port_handle.write_int32_no_wait(self.ndarray_params.array_size_y, 0, info.y_size as i32);
167        self.port_handle.write_int32_no_wait(self.ndarray_params.array_size_z, 0, info.color_size as i32);
168        self.port_handle.write_int32_no_wait(self.ndarray_params.array_size, 0, info.total_bytes as i32);
169        self.port_handle.write_int32_no_wait(self.ndarray_params.data_type, 0, report_arr.data.data_type() as i32);
170        self.port_handle.write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
171
172        let ts_f64 = array.timestamp.as_f64();
173        self.port_handle.write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
174        self.port_handle.write_int32_no_wait(self.ndarray_params.epics_ts_sec, 0, array.timestamp.sec as i32);
175        self.port_handle.write_int32_no_wait(self.ndarray_params.epics_ts_nsec, 0, array.timestamp.nsec as i32);
176
177        self.port_handle.write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
178
179        // Collect unique addrs that have updates (beyond addr 0 which is always flushed)
180        let mut extra_addrs: Vec<i32> = Vec::new();
181        for update in &result.param_updates {
182            match update {
183                ParamUpdate::Int32 { reason, addr, value } => {
184                    self.port_handle.write_int32_no_wait(*reason, *addr, *value);
185                    if *addr != 0 && !extra_addrs.contains(addr) {
186                        extra_addrs.push(*addr);
187                    }
188                }
189                ParamUpdate::Float64 { reason, addr, value } => {
190                    self.port_handle.write_float64_no_wait(*reason, *addr, *value);
191                    if *addr != 0 && !extra_addrs.contains(addr) {
192                        extra_addrs.push(*addr);
193                    }
194                }
195            }
196        }
197
198        self.port_handle.call_param_callbacks_no_wait(0);
199        for addr in extra_addrs {
200            self.port_handle.call_param_callbacks_no_wait(addr);
201        }
202    }
203}
204
205/// Type-erased handle for blocking mode: allows NDArraySender to call
206/// process_and_publish without knowing the concrete processor type.
207struct BlockingProcessorHandle<P: NDPluginProcess> {
208    inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
209}
210
211impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
212    fn process_and_publish(&self, array: &NDArray) {
213        self.inner.lock().process_and_publish(array);
214    }
215}
216
217/// PortDriver implementation for a plugin's control plane.
218#[allow(dead_code)]
219pub struct PluginPortDriver {
220    base: PortDriverBase,
221    ndarray_params: NDArrayDriverParams,
222    plugin_params: PluginBaseParams,
223    param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
224}
225
226impl PluginPortDriver {
227    fn new<P: NDPluginProcess>(
228        port_name: &str,
229        plugin_type_name: &str,
230        queue_size: usize,
231        ndarray_port: &str,
232        max_addr: usize,
233        param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
234        processor: &mut P,
235    ) -> AsynResult<Self> {
236        let mut base = PortDriverBase::new(
237            port_name,
238            max_addr,
239            PortFlags {
240                can_block: true,
241                ..Default::default()
242            },
243        );
244
245        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
246        let plugin_params = PluginBaseParams::create(&mut base)?;
247
248        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
249        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
250        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
251        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
252        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
253        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
254        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
255        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
256
257        // Set plugin identity params
258        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
259        if !ndarray_port.is_empty() {
260            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
261        }
262
263        // Let the processor register its plugin-specific params
264        processor.register_params(&mut base)?;
265
266        Ok(Self {
267            base,
268            ndarray_params,
269            plugin_params,
270            param_change_tx,
271        })
272    }
273}
274
275impl PortDriver for PluginPortDriver {
276    fn base(&self) -> &PortDriverBase {
277        &self.base
278    }
279
280    fn base_mut(&mut self) -> &mut PortDriverBase {
281        &mut self.base
282    }
283
284    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
285        let reason = user.reason;
286        let addr = user.addr;
287        self.base.set_int32_param(reason, addr, value)?;
288        self.base.call_param_callbacks(addr)?;
289        let _ = self.param_change_tx.try_send((reason, addr, ParamChangeValue::Int32(value)));
290        Ok(())
291    }
292
293    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
294        let reason = user.reason;
295        let addr = user.addr;
296        self.base.set_float64_param(reason, addr, value)?;
297        self.base.call_param_callbacks(addr)?;
298        let _ = self.param_change_tx.try_send((reason, addr, ParamChangeValue::Float64(value)));
299        Ok(())
300    }
301
302    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
303        let reason = user.reason;
304        let addr = user.addr;
305        let s = String::from_utf8_lossy(data).into_owned();
306        self.base.set_string_param(reason, addr, s.clone())?;
307        self.base.call_param_callbacks(addr)?;
308        let _ = self.param_change_tx.try_send((reason, addr, ParamChangeValue::Octet(s)));
309        Ok(())
310    }
311}
312
313/// Handle to a running plugin runtime. Provides access to sender and port handle.
314#[derive(Clone)]
315pub struct PluginRuntimeHandle {
316    port_runtime: PortRuntimeHandle,
317    array_sender: NDArraySender,
318    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
319    port_name: String,
320    pub ndarray_params: NDArrayDriverParams,
321    pub plugin_params: PluginBaseParams,
322}
323
324impl PluginRuntimeHandle {
325    pub fn port_runtime(&self) -> &PortRuntimeHandle {
326        &self.port_runtime
327    }
328
329    pub fn array_sender(&self) -> &NDArraySender {
330        &self.array_sender
331    }
332
333    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
334        &self.array_output
335    }
336
337    pub fn port_name(&self) -> &str {
338        &self.port_name
339    }
340}
341
342/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
343///
344/// Returns:
345/// - `PluginRuntimeHandle` for wiring and control
346/// - `PortRuntimeHandle` for param I/O
347/// - `JoinHandle` for the data processing thread
348pub fn create_plugin_runtime<P: NDPluginProcess>(
349    port_name: &str,
350    processor: P,
351    pool: Arc<NDArrayPool>,
352    queue_size: usize,
353    ndarray_port: &str,
354    wiring: Arc<WiringRegistry>,
355) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
356    create_plugin_runtime_multi_addr(port_name, processor, pool, queue_size, ndarray_port, wiring, 1)
357}
358
359/// Create a plugin runtime with multi-addr support.
360///
361/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
362pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
363    port_name: &str,
364    mut processor: P,
365    pool: Arc<NDArrayPool>,
366    queue_size: usize,
367    ndarray_port: &str,
368    wiring: Arc<WiringRegistry>,
369    max_addr: usize,
370) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
371    // Param change channel (control plane -> data plane)
372    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
373
374    // Capture plugin type before mutable borrow
375    let plugin_type_name = processor.plugin_type().to_string();
376
377    // Create the port driver for control plane
378    let driver = PluginPortDriver::new(port_name, &plugin_type_name, queue_size, ndarray_port, max_addr, param_tx, &mut processor)
379        .expect("failed to create plugin port driver");
380
381    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
382    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
383    let ndarray_params = driver.ndarray_params;
384    let plugin_params = driver.plugin_params;
385
386    // Create port runtime (actor thread for param I/O)
387    let (port_runtime, _actor_jh) =
388        create_port_runtime(driver, RuntimeConfig::default());
389
390    // Clone port handle for the data thread to write params back
391    let port_handle = port_runtime.port_handle().clone();
392
393    // Array channel (data plane)
394    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
395
396    // Shared mode flags
397    let enabled = Arc::new(AtomicBool::new(false));
398    let blocking_mode = Arc::new(AtomicBool::new(false));
399
400    // Shared processor (accessible from both data thread and caller thread)
401    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
402    let array_output_for_handle = array_output.clone();
403    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
404        processor,
405        output: array_output,
406        pool,
407        ndarray_params,
408        plugin_params,
409        port_handle,
410        array_counter: 0,
411    }));
412
413    // Type-erased handle for blocking mode
414    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
415        inner: shared.clone(),
416    });
417
418    let data_enabled = enabled.clone();
419    let data_blocking = blocking_mode.clone();
420    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
421
422    // Capture wiring info for data loop
423    let nd_array_port_reason = plugin_params.nd_array_port;
424    let sender_port_name = port_name.to_string();
425    let initial_upstream = ndarray_port.to_string();
426
427    // Spawn data processing thread
428    let data_jh = thread::Builder::new()
429        .name(format!("plugin-data-{port_name}"))
430        .spawn(move || {
431            plugin_data_loop(
432                shared,
433                array_rx,
434                param_rx,
435                enable_callbacks_reason,
436                blocking_callbacks_reason,
437                data_enabled,
438                data_blocking,
439                nd_array_port_reason,
440                sender_port_name,
441                initial_upstream,
442                wiring,
443            );
444        })
445        .expect("failed to spawn plugin data thread");
446
447    let handle = PluginRuntimeHandle {
448        port_runtime,
449        array_sender,
450        array_output: array_output_for_handle,
451        port_name: port_name.to_string(),
452        ndarray_params,
453        plugin_params,
454    };
455
456    (handle, data_jh)
457}
458
459fn plugin_data_loop<P: NDPluginProcess>(
460    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
461    mut array_rx: NDArrayReceiver,
462    mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
463    enable_callbacks_reason: usize,
464    blocking_callbacks_reason: usize,
465    enabled: Arc<AtomicBool>,
466    blocking_mode: Arc<AtomicBool>,
467    nd_array_port_reason: usize,
468    sender_port_name: String,
469    initial_upstream: String,
470    wiring: Arc<WiringRegistry>,
471) {
472    let mut current_upstream = initial_upstream;
473    let rt = tokio::runtime::Builder::new_current_thread()
474        .enable_all()
475        .build()
476        .unwrap();
477    rt.block_on(async {
478        loop {
479            tokio::select! {
480                msg = array_rx.recv_msg() => {
481                    match msg {
482                        Some(msg) => {
483                            // In blocking mode, arrays are processed inline by the caller.
484                            // Skip processing here to avoid double-processing.
485                            if !blocking_mode.load(Ordering::Acquire) {
486                                shared.lock().process_and_publish(&msg.array);
487                            }
488                            // msg dropped here → completion signaled (if tracked)
489                        }
490                        None => break,
491                    }
492                }
493                param = param_rx.recv() => {
494                    match param {
495                        Some((reason, addr, value)) => {
496                            if reason == enable_callbacks_reason {
497                                enabled.store(value.as_i32() != 0, Ordering::Release);
498                            }
499                            if reason == blocking_callbacks_reason {
500                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
501                            }
502                            // Handle NDArrayPort rewiring
503                            if reason == nd_array_port_reason {
504                                if let Some(new_port) = value.as_string() {
505                                    let old = std::mem::replace(&mut current_upstream, new_port.to_string());
506                                    if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
507                                        eprintln!("NDArrayPort rewire failed: {e}");
508                                        // Revert current_upstream on failure
509                                        current_upstream = old;
510                                    }
511                                }
512                            }
513                            let snapshot = PluginParamSnapshot {
514                                enable_callbacks: enabled.load(Ordering::Acquire),
515                                reason,
516                                addr,
517                                value,
518                            };
519                            shared.lock().processor.on_param_change(reason, &snapshot);
520                        }
521                        None => break,
522                    }
523                }
524            }
525        }
526    });
527}
528
529/// Connect a downstream plugin's sender to a plugin runtime's output.
530pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
531    upstream.array_output().lock().add(downstream_sender);
532}
533
534/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
535pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
536    port_name: &str,
537    mut processor: P,
538    pool: Arc<NDArrayPool>,
539    queue_size: usize,
540    output: NDArrayOutput,
541    ndarray_port: &str,
542    wiring: Arc<WiringRegistry>,
543) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
544    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
545
546    let plugin_type_name = processor.plugin_type().to_string();
547    let driver = PluginPortDriver::new(port_name, &plugin_type_name, queue_size, ndarray_port, 1, param_tx, &mut processor)
548        .expect("failed to create plugin port driver");
549
550    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
551    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
552    let ndarray_params = driver.ndarray_params;
553    let plugin_params = driver.plugin_params;
554
555    let (port_runtime, _actor_jh) =
556        create_port_runtime(driver, RuntimeConfig::default());
557
558    let port_handle = port_runtime.port_handle().clone();
559
560    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
561
562    let enabled = Arc::new(AtomicBool::new(false));
563    let blocking_mode = Arc::new(AtomicBool::new(false));
564
565    let array_output = Arc::new(parking_lot::Mutex::new(output));
566    let array_output_for_handle = array_output.clone();
567    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
568        processor,
569        output: array_output,
570        pool,
571        ndarray_params,
572        plugin_params,
573        port_handle,
574        array_counter: 0,
575    }));
576
577    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
578        inner: shared.clone(),
579    });
580
581    let data_enabled = enabled.clone();
582    let data_blocking = blocking_mode.clone();
583    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
584
585    // Capture wiring info for data loop
586    let nd_array_port_reason = plugin_params.nd_array_port;
587    let sender_port_name = port_name.to_string();
588    let initial_upstream = ndarray_port.to_string();
589
590    let data_jh = thread::Builder::new()
591        .name(format!("plugin-data-{port_name}"))
592        .spawn(move || {
593            plugin_data_loop(
594                shared,
595                array_rx,
596                param_rx,
597                enable_callbacks_reason,
598                blocking_callbacks_reason,
599                data_enabled,
600                data_blocking,
601                nd_array_port_reason,
602                sender_port_name,
603                initial_upstream,
604                wiring,
605            );
606        })
607        .expect("failed to spawn plugin data thread");
608
609    let handle = PluginRuntimeHandle {
610        port_runtime,
611        array_sender,
612        array_output: array_output_for_handle,
613        port_name: port_name.to_string(),
614        ndarray_params,
615        plugin_params,
616    };
617
618    (handle, data_jh)
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624    use crate::ndarray::{NDDataType, NDDimension};
625    use crate::plugin::channel::ndarray_channel;
626
627    /// Passthrough processor: returns the input array as-is.
628    struct PassthroughProcessor;
629
630    impl NDPluginProcess for PassthroughProcessor {
631        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
632            ProcessResult::arrays(vec![Arc::new(array.clone())])
633        }
634        fn plugin_type(&self) -> &str {
635            "Passthrough"
636        }
637    }
638
639    /// Sink processor: consumes arrays, returns nothing.
640    struct SinkProcessor {
641        count: usize,
642    }
643
644    impl NDPluginProcess for SinkProcessor {
645        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
646            self.count += 1;
647            ProcessResult::empty()
648        }
649        fn plugin_type(&self) -> &str {
650            "Sink"
651        }
652    }
653
654    fn make_test_array(id: i32) -> Arc<NDArray> {
655        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
656        arr.unique_id = id;
657        Arc::new(arr)
658    }
659
660    fn test_wiring() -> Arc<WiringRegistry> {
661        Arc::new(WiringRegistry::new())
662    }
663
664    /// Enable callbacks on a plugin handle (plugins default to disabled).
665    fn enable_callbacks(handle: &PluginRuntimeHandle) {
666        handle
667            .port_runtime()
668            .port_handle()
669            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
670            .unwrap();
671        std::thread::sleep(std::time::Duration::from_millis(10));
672    }
673
674    #[test]
675    fn test_passthrough_runtime() {
676        let pool = Arc::new(NDArrayPool::new(1_000_000));
677
678        // Create downstream receiver
679        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
680        let mut output = NDArrayOutput::new();
681        output.add(downstream_sender);
682
683        let (handle, _data_jh) = create_plugin_runtime_with_output(
684            "PASS1",
685            PassthroughProcessor,
686            pool,
687            10,
688            output,
689            "",
690            test_wiring(),
691        );
692        enable_callbacks(&handle);
693
694        // Send an array
695        handle.array_sender().send(make_test_array(42));
696
697        // Should come out the other side
698        let received = downstream_rx.blocking_recv().unwrap();
699        assert_eq!(received.unique_id, 42);
700    }
701
702    #[test]
703    fn test_sink_runtime() {
704        let pool = Arc::new(NDArrayPool::new(1_000_000));
705
706        let (handle, _data_jh) = create_plugin_runtime(
707            "SINK1",
708            SinkProcessor { count: 0 },
709            pool,
710            10,
711            "",
712            test_wiring(),
713        );
714        enable_callbacks(&handle);
715
716        // Send arrays - they should be consumed silently
717        handle.array_sender().send(make_test_array(1));
718        handle.array_sender().send(make_test_array(2));
719
720        // Give processing thread time
721        std::thread::sleep(std::time::Duration::from_millis(50));
722
723        // No crash, no output needed
724        assert_eq!(handle.port_name(), "SINK1");
725    }
726
727    #[test]
728    fn test_plugin_type_param() {
729        let pool = Arc::new(NDArrayPool::new(1_000_000));
730
731        let (handle, _data_jh) = create_plugin_runtime(
732            "TYPE_TEST",
733            PassthroughProcessor,
734            pool,
735            10,
736            "",
737            test_wiring(),
738        );
739
740        // Verify port name
741        assert_eq!(handle.port_name(), "TYPE_TEST");
742        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
743    }
744
745    #[test]
746    fn test_shutdown_on_handle_drop() {
747        let pool = Arc::new(NDArrayPool::new(1_000_000));
748
749        let (handle, data_jh) = create_plugin_runtime(
750            "SHUTDOWN_TEST",
751            PassthroughProcessor,
752            pool,
753            10,
754            "",
755            test_wiring(),
756        );
757
758        // Drop the handle (closes sender channel, which should cause data thread to exit)
759        let sender = handle.array_sender().clone();
760        drop(handle);
761        drop(sender);
762
763        // Data thread should terminate
764        let result = data_jh.join();
765        assert!(result.is_ok());
766    }
767
768    #[test]
769    fn test_dropped_count_when_queue_full() {
770        let pool = Arc::new(NDArrayPool::new(1_000_000));
771
772        // Very slow processor
773        struct SlowProcessor;
774        impl NDPluginProcess for SlowProcessor {
775            fn process_array(
776                &mut self,
777                _array: &NDArray,
778                _pool: &NDArrayPool,
779            ) -> ProcessResult {
780                std::thread::sleep(std::time::Duration::from_millis(100));
781                ProcessResult::empty()
782            }
783            fn plugin_type(&self) -> &str {
784                "Slow"
785            }
786        }
787
788        let (handle, _data_jh) = create_plugin_runtime(
789            "DROP_TEST",
790            SlowProcessor,
791            pool,
792            1,
793            "",
794            test_wiring(),
795        );
796        enable_callbacks(&handle);
797
798        // Fill the queue and overflow
799        for i in 0..10 {
800            handle.array_sender().send(make_test_array(i));
801        }
802
803        // Some should have been dropped
804        assert!(handle.array_sender().dropped_count() > 0);
805    }
806
807    #[test]
808    fn test_blocking_callbacks_basic() {
809        let pool = Arc::new(NDArrayPool::new(1_000_000));
810        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
811        let mut output = NDArrayOutput::new();
812        output.add(downstream_sender);
813
814        let (handle, _data_jh) = create_plugin_runtime_with_output(
815            "BLOCK_TEST",
816            PassthroughProcessor,
817            pool,
818            10,
819            output,
820            "",
821            test_wiring(),
822        );
823        enable_callbacks(&handle);
824
825        // Enable blocking mode
826        handle
827            .port_runtime()
828            .port_handle()
829            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
830            .unwrap();
831        std::thread::sleep(std::time::Duration::from_millis(50));
832
833        // In blocking mode, send() processes inline and returns synchronously
834        handle.array_sender().send(make_test_array(42));
835
836        // Array should already be in the downstream channel
837        let received = downstream_rx.blocking_recv().unwrap();
838        assert_eq!(received.unique_id, 42);
839    }
840
841    #[test]
842    fn test_blocking_to_nonblocking_switch() {
843        let pool = Arc::new(NDArrayPool::new(1_000_000));
844        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
845        let mut output = NDArrayOutput::new();
846        output.add(downstream_sender);
847
848        let (handle, _data_jh) = create_plugin_runtime_with_output(
849            "SWITCH_TEST",
850            PassthroughProcessor,
851            pool,
852            10,
853            output,
854            "",
855            test_wiring(),
856        );
857        enable_callbacks(&handle);
858
859        // Start in blocking mode
860        handle
861            .port_runtime()
862            .port_handle()
863            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
864            .unwrap();
865        std::thread::sleep(std::time::Duration::from_millis(50));
866
867        handle.array_sender().send(make_test_array(1));
868        let received = downstream_rx.blocking_recv().unwrap();
869        assert_eq!(received.unique_id, 1);
870
871        // Switch back to non-blocking
872        handle
873            .port_runtime()
874            .port_handle()
875            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
876            .unwrap();
877        std::thread::sleep(std::time::Duration::from_millis(50));
878
879        // Send in non-blocking mode — goes through channel to data thread
880        handle.array_sender().send(make_test_array(2));
881        let received = downstream_rx.blocking_recv().unwrap();
882        assert_eq!(received.unique_id, 2);
883    }
884
885    #[test]
886    fn test_enable_callbacks_disables_processing() {
887        let pool = Arc::new(NDArrayPool::new(1_000_000));
888        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
889        let mut output = NDArrayOutput::new();
890        output.add(downstream_sender);
891
892        let (handle, _data_jh) = create_plugin_runtime_with_output(
893            "ENABLE_TEST",
894            PassthroughProcessor,
895            pool,
896            10,
897            output,
898            "",
899            test_wiring(),
900        );
901
902        // Disable callbacks
903        handle
904            .port_runtime()
905            .port_handle()
906            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
907            .unwrap();
908        std::thread::sleep(std::time::Duration::from_millis(50));
909
910        // Send array — should be silently dropped by sender
911        handle.array_sender().send(make_test_array(99));
912
913        // Verify nothing received (with timeout)
914        let rt = tokio::runtime::Builder::new_current_thread()
915            .enable_all()
916            .build()
917            .unwrap();
918        let result = rt.block_on(async {
919            tokio::time::timeout(
920                std::time::Duration::from_millis(100),
921                downstream_rx.recv(),
922            )
923            .await
924        });
925        assert!(
926            result.is_err(),
927            "should not receive array when callbacks disabled"
928        );
929    }
930
931    #[test]
932    fn test_blocking_downstream_receives() {
933        let pool = Arc::new(NDArrayPool::new(1_000_000));
934
935        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
936        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
937        let mut output = NDArrayOutput::new();
938        output.add(ds1);
939        output.add(ds2);
940
941        let (handle, _data_jh) = create_plugin_runtime_with_output(
942            "BLOCK_DS_TEST",
943            PassthroughProcessor,
944            pool,
945            10,
946            output,
947            "",
948            test_wiring(),
949        );
950        enable_callbacks(&handle);
951
952        // Enable blocking mode
953        handle
954            .port_runtime()
955            .port_handle()
956            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
957            .unwrap();
958        std::thread::sleep(std::time::Duration::from_millis(50));
959
960        handle.array_sender().send(make_test_array(77));
961
962        // Both downstream receivers should have the array
963        let r1 = rx1.blocking_recv().unwrap();
964        let r2 = rx2.blocking_recv().unwrap();
965        assert_eq!(r1.unique_id, 77);
966        assert_eq!(r2.unique_id, 77);
967    }
968
969    #[test]
970    fn test_blocking_param_updates() {
971        let pool = Arc::new(NDArrayPool::new(1_000_000));
972
973        struct ParamTracker;
974        impl NDPluginProcess for ParamTracker {
975            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
976                ProcessResult::arrays(vec![Arc::new(array.clone())])
977            }
978            fn plugin_type(&self) -> &str {
979                "ParamTracker"
980            }
981        }
982
983        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
984        let mut output = NDArrayOutput::new();
985        output.add(downstream_sender);
986
987        let (handle, _data_jh) = create_plugin_runtime_with_output(
988            "PARAM_TEST",
989            ParamTracker,
990            pool,
991            10,
992            output,
993            "",
994            test_wiring(),
995        );
996        enable_callbacks(&handle);
997
998        // Enable blocking mode
999        handle
1000            .port_runtime()
1001            .port_handle()
1002            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1003            .unwrap();
1004        std::thread::sleep(std::time::Duration::from_millis(50));
1005
1006        // Send array in blocking mode
1007        handle.array_sender().send(make_test_array(1));
1008        let received = downstream_rx.blocking_recv().unwrap();
1009        assert_eq!(received.unique_id, 1);
1010
1011        // Write enable_callbacks while in blocking mode — should not crash
1012        handle
1013            .port_runtime()
1014            .port_handle()
1015            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1016            .unwrap();
1017        std::thread::sleep(std::time::Duration::from_millis(50));
1018
1019        // Still works after param update
1020        handle.array_sender().send(make_test_array(2));
1021        let received = downstream_rx.blocking_recv().unwrap();
1022        assert_eq!(received.unique_id, 2);
1023    }
1024
1025    /// Phase 0 regression test: process_and_publish inside a current-thread runtime must not panic.
1026    #[test]
1027    fn test_no_panic_in_current_thread_runtime() {
1028        let pool = Arc::new(NDArrayPool::new(1_000_000));
1029        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1030        let mut output = NDArrayOutput::new();
1031        output.add(downstream_sender);
1032
1033        let (handle, _data_jh) = create_plugin_runtime_with_output(
1034            "CURRENT_THREAD_TEST",
1035            PassthroughProcessor,
1036            pool,
1037            10,
1038            output,
1039            "",
1040            test_wiring(),
1041        );
1042        enable_callbacks(&handle);
1043
1044        // Enable blocking mode so process_and_publish runs inline
1045        handle
1046            .port_runtime()
1047            .port_handle()
1048            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1049            .unwrap();
1050        std::thread::sleep(std::time::Duration::from_millis(50));
1051
1052        // Call send (which calls process_and_publish inline) from inside a current-thread runtime
1053        let rt = tokio::runtime::Builder::new_current_thread()
1054            .enable_all()
1055            .build()
1056            .unwrap();
1057        rt.block_on(async {
1058            handle.array_sender().send(make_test_array(99));
1059        });
1060
1061        let received = downstream_rx.blocking_recv().unwrap();
1062        assert_eq!(received.unique_id, 99);
1063    }
1064}