Skip to main content

ad_core_rs/plugin/
runtime.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread;
4
5use asyn_rs::error::AsynResult;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::runtime::config::RuntimeConfig;
8use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
9use asyn_rs::user::AsynUser;
10
11use asyn_rs::port_handle::PortHandle;
12
13use crate::ndarray::NDArray;
14use crate::ndarray_pool::NDArrayPool;
15use crate::params::ndarray_driver::NDArrayDriverParams;
16
17use super::channel::{
18    BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
19};
20use super::params::PluginBaseParams;
21use super::wiring::WiringRegistry;
22
23/// Value sent through the param change channel from control plane to data plane.
24#[derive(Debug, Clone)]
25pub enum ParamChangeValue {
26    Int32(i32),
27    Float64(f64),
28    Octet(String),
29}
30
31impl ParamChangeValue {
32    pub fn as_i32(&self) -> i32 {
33        match self {
34            ParamChangeValue::Int32(v) => *v,
35            ParamChangeValue::Float64(v) => *v as i32,
36            ParamChangeValue::Octet(_) => 0,
37        }
38    }
39
40    pub fn as_f64(&self) -> f64 {
41        match self {
42            ParamChangeValue::Int32(v) => *v as f64,
43            ParamChangeValue::Float64(v) => *v,
44            ParamChangeValue::Octet(_) => 0.0,
45        }
46    }
47
48    pub fn as_string(&self) -> Option<&str> {
49        match self {
50            ParamChangeValue::Octet(s) => Some(s),
51            _ => None,
52        }
53    }
54}
55
56/// A single parameter update produced by a plugin's process_array.
57pub enum ParamUpdate {
58    Int32 {
59        reason: usize,
60        addr: i32,
61        value: i32,
62    },
63    Float64 {
64        reason: usize,
65        addr: i32,
66        value: f64,
67    },
68    Octet {
69        reason: usize,
70        addr: i32,
71        value: String,
72    },
73}
74
75impl ParamUpdate {
76    /// Create an Int32 update at addr 0.
77    pub fn int32(reason: usize, value: i32) -> Self {
78        Self::Int32 {
79            reason,
80            addr: 0,
81            value,
82        }
83    }
84    /// Create a Float64 update at addr 0.
85    pub fn float64(reason: usize, value: f64) -> Self {
86        Self::Float64 {
87            reason,
88            addr: 0,
89            value,
90        }
91    }
92    /// Create an Int32 update at a specific addr.
93    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
94        Self::Int32 {
95            reason,
96            addr,
97            value,
98        }
99    }
100    /// Create a Float64 update at a specific addr.
101    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
102        Self::Float64 {
103            reason,
104            addr,
105            value,
106        }
107    }
108}
109
110/// Result of processing one array: output arrays + param updates to write back.
111pub struct ProcessResult {
112    pub output_arrays: Vec<Arc<NDArray>>,
113    pub param_updates: Vec<ParamUpdate>,
114    /// If set, only publish to the subscriber at this index (round-robin scatter).
115    pub scatter_index: Option<usize>,
116}
117
118impl ProcessResult {
119    /// Convenience: sink plugin with only param updates, no output arrays.
120    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
121        Self {
122            output_arrays: vec![],
123            param_updates,
124            scatter_index: None,
125        }
126    }
127
128    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
129    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
130        Self {
131            output_arrays,
132            param_updates: vec![],
133            scatter_index: None,
134        }
135    }
136
137    /// Convenience: no outputs, no param updates.
138    pub fn empty() -> Self {
139        Self {
140            output_arrays: vec![],
141            param_updates: vec![],
142            scatter_index: None,
143        }
144    }
145
146    /// Convenience: scatter output — send to a single subscriber by index.
147    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
148        Self {
149            output_arrays,
150            param_updates: vec![],
151            scatter_index: Some(index),
152        }
153    }
154}
155
156/// Result of handling a control-plane param change.
157pub struct ParamChangeResult {
158    pub output_arrays: Vec<Arc<NDArray>>,
159    pub param_updates: Vec<ParamUpdate>,
160}
161
162impl ParamChangeResult {
163    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
164        Self {
165            output_arrays: vec![],
166            param_updates,
167        }
168    }
169
170    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
171        Self {
172            output_arrays,
173            param_updates: vec![],
174        }
175    }
176
177    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
178        Self {
179            output_arrays,
180            param_updates,
181        }
182    }
183
184    pub fn empty() -> Self {
185        Self {
186            output_arrays: vec![],
187            param_updates: vec![],
188        }
189    }
190}
191
192/// Pure processing logic. No threading concerns.
193pub trait NDPluginProcess: Send + 'static {
194    /// Process one array. Return output arrays and param updates.
195    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
196
197    /// Plugin type name for PLUGIN_TYPE param.
198    fn plugin_type(&self) -> &str;
199
200    /// Register plugin-specific params on the base. Called once during construction.
201    fn register_params(
202        &mut self,
203        _base: &mut PortDriverBase,
204    ) -> Result<(), asyn_rs::error::AsynError> {
205        Ok(())
206    }
207
208    /// Called when a param changes. Reason is the param index.
209    /// Return param updates to be written back to the port driver.
210    fn on_param_change(
211        &mut self,
212        _reason: usize,
213        _params: &PluginParamSnapshot,
214    ) -> ParamChangeResult {
215        ParamChangeResult::empty()
216    }
217
218    /// Return a handle to the latest NDArray data for array reads.
219    /// Override this in plugins like NDPluginStdArrays that serve pixel data
220    /// via readInt8Array/readInt16Array/etc.
221    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
222        None
223    }
224}
225
226/// Read-only snapshot of param values available to the processing thread.
227pub struct PluginParamSnapshot {
228    pub enable_callbacks: bool,
229    /// The param reason that changed.
230    pub reason: usize,
231    /// The address (sub-device) that changed.
232    pub addr: i32,
233    /// The new value.
234    pub value: ParamChangeValue,
235}
236
237/// Shared processor state protected by a mutex, accessible from both
238/// the data thread (non-blocking mode) and the caller thread (blocking mode).
239struct SharedProcessorInner<P: NDPluginProcess> {
240    processor: P,
241    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
242    pool: Arc<NDArrayPool>,
243    ndarray_params: NDArrayDriverParams,
244    plugin_params: PluginBaseParams,
245    port_handle: PortHandle,
246    array_counter: i32,
247    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
248    std_array_data_param: Option<usize>,
249}
250
251impl<P: NDPluginProcess> SharedProcessorInner<P> {
252    fn process_and_publish(&mut self, array: &NDArray) {
253        let t0 = std::time::Instant::now();
254        let result = self.processor.process_array(array, &self.pool);
255        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
256        self.publish_result(
257            result.output_arrays,
258            result.param_updates,
259            result.scatter_index,
260            Some(array),
261            elapsed_ms,
262        );
263    }
264
265    fn publish_result(
266        &mut self,
267        output_arrays: Vec<Arc<NDArray>>,
268        param_updates: Vec<ParamUpdate>,
269        scatter_index: Option<usize>,
270        fallback_array: Option<&NDArray>,
271        elapsed_ms: f64,
272    ) {
273        let output = self.output.lock();
274        for out in &output_arrays {
275            if let Some(idx) = scatter_index {
276                output.publish_to(idx, out.clone());
277            } else {
278                output.publish(out.clone());
279            }
280        }
281        drop(output);
282
283        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
284            self.array_counter += 1;
285
286            // Fire array data interrupt directly (C EPICS pattern).
287            // Bypasses port actor channel to avoid dropping large array messages.
288            if let Some(param) = self.std_array_data_param {
289                use crate::ndarray::NDDataBuffer;
290                use asyn_rs::param::ParamValue;
291                let value = match &report_arr.data {
292                    NDDataBuffer::I8(v) => {
293                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
294                    }
295                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
296                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
297                    ))),
298                    NDDataBuffer::I16(v) => {
299                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
300                    }
301                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
302                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
303                    ))),
304                    NDDataBuffer::I32(v) => {
305                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
306                    }
307                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
308                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
309                    ))),
310                    NDDataBuffer::I64(v) => {
311                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
312                    }
313                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
314                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
315                    ))),
316                    NDDataBuffer::F32(v) => {
317                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
318                    }
319                    NDDataBuffer::F64(v) => {
320                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
321                    }
322                };
323                if let Some(value) = value {
324                    let ts = report_arr.timestamp.to_system_time();
325                    self.port_handle
326                        .interrupts()
327                        .notify(asyn_rs::interrupt::InterruptValue {
328                            reason: param,
329                            addr: 0,
330                            value,
331                            timestamp: ts,
332                        });
333                }
334            }
335
336            let info = report_arr.info();
337            let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
338            self.port_handle.write_int32_no_wait(
339                self.ndarray_params.array_counter,
340                0,
341                self.array_counter,
342            );
343            self.port_handle.write_int32_no_wait(
344                self.ndarray_params.unique_id,
345                0,
346                report_arr.unique_id,
347            );
348            self.port_handle.write_int32_no_wait(
349                self.ndarray_params.n_dimensions,
350                0,
351                report_arr.dims.len() as i32,
352            );
353            self.port_handle.write_int32_no_wait(
354                self.ndarray_params.array_size_x,
355                0,
356                info.x_size as i32,
357            );
358            self.port_handle.write_int32_no_wait(
359                self.ndarray_params.array_size_y,
360                0,
361                info.y_size as i32,
362            );
363            self.port_handle.write_int32_no_wait(
364                self.ndarray_params.array_size_z,
365                0,
366                info.color_size as i32,
367            );
368            self.port_handle.write_int32_no_wait(
369                self.ndarray_params.array_size,
370                0,
371                info.total_bytes as i32,
372            );
373            self.port_handle.write_int32_no_wait(
374                self.ndarray_params.data_type,
375                0,
376                report_arr.data.data_type() as i32,
377            );
378            self.port_handle
379                .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
380
381            let ts_f64 = report_arr.timestamp.as_f64();
382            self.port_handle
383                .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
384            self.port_handle.write_int32_no_wait(
385                self.ndarray_params.epics_ts_sec,
386                0,
387                report_arr.timestamp.sec as i32,
388            );
389            self.port_handle.write_int32_no_wait(
390                self.ndarray_params.epics_ts_nsec,
391                0,
392                report_arr.timestamp.nsec as i32,
393            );
394        }
395
396        self.port_handle
397            .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
398
399        // Collect unique addrs that have updates (beyond addr 0 which is always flushed)
400        let mut extra_addrs: Vec<i32> = Vec::new();
401        for update in &param_updates {
402            match update {
403                ParamUpdate::Int32 {
404                    reason,
405                    addr,
406                    value,
407                } => {
408                    self.port_handle.write_int32_no_wait(*reason, *addr, *value);
409                    if *addr != 0 && !extra_addrs.contains(addr) {
410                        extra_addrs.push(*addr);
411                    }
412                }
413                ParamUpdate::Float64 {
414                    reason,
415                    addr,
416                    value,
417                } => {
418                    self.port_handle
419                        .write_float64_no_wait(*reason, *addr, *value);
420                    if *addr != 0 && !extra_addrs.contains(addr) {
421                        extra_addrs.push(*addr);
422                    }
423                }
424                ParamUpdate::Octet {
425                    reason,
426                    addr,
427                    value,
428                } => {
429                    let data = value.as_bytes().to_vec();
430                    let user = asyn_rs::user::AsynUser::new(*reason).with_addr(*addr);
431                    self.port_handle
432                        .submit_no_wait(asyn_rs::request::RequestOp::OctetWrite { data }, user);
433                }
434            }
435        }
436
437        self.port_handle.call_param_callbacks_no_wait(0);
438        for addr in extra_addrs {
439            self.port_handle.call_param_callbacks_no_wait(addr);
440        }
441    }
442}
443
444/// Type-erased handle for blocking mode: allows NDArraySender to call
445/// process_and_publish without knowing the concrete processor type.
446struct BlockingProcessorHandle<P: NDPluginProcess> {
447    inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
448}
449
450impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
451    fn process_and_publish(&self, array: &NDArray) {
452        self.inner.lock().process_and_publish(array);
453    }
454}
455
456/// PortDriver implementation for a plugin's control plane.
457#[allow(dead_code)]
458pub struct PluginPortDriver {
459    base: PortDriverBase,
460    ndarray_params: NDArrayDriverParams,
461    plugin_params: PluginBaseParams,
462    param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
463    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
464    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
465    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
466    std_array_data_param: Option<usize>,
467}
468
469impl PluginPortDriver {
470    fn new<P: NDPluginProcess>(
471        port_name: &str,
472        plugin_type_name: &str,
473        queue_size: usize,
474        ndarray_port: &str,
475        max_addr: usize,
476        param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
477        processor: &mut P,
478        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
479    ) -> AsynResult<Self> {
480        let mut base = PortDriverBase::new(
481            port_name,
482            max_addr,
483            PortFlags {
484                can_block: true,
485                ..Default::default()
486            },
487        );
488
489        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
490        let plugin_params = PluginBaseParams::create(&mut base)?;
491
492        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
493        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
494        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
495        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
496        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
497        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
498        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
499        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
500        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
501        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
502        base.set_int32_param(ndarray_params.capture, 0, 0)?;
503        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
504        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
505        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
506        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
507        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
508        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
509        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
510        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
511        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
512        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
513
514        // Set plugin identity params
515        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
516        if !ndarray_port.is_empty() {
517            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
518        }
519
520        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
521        let std_array_data_param = if array_data.is_some() {
522            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
523        } else {
524            None
525        };
526
527        // Let the processor register its plugin-specific params
528        processor.register_params(&mut base)?;
529
530        Ok(Self {
531            base,
532            ndarray_params,
533            plugin_params,
534            param_change_tx,
535            array_data,
536            std_array_data_param,
537        })
538    }
539}
540
541/// Copy source slice directly into destination buffer, returning elements copied.
542fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
543    let n = src.len().min(dst.len());
544    dst[..n].copy_from_slice(&src[..n]);
545    n
546}
547
548/// Convert and copy source slice into destination buffer element-by-element.
549fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
550where
551    S: CastToF64 + Copy,
552    D: CastFromF64 + Copy,
553{
554    let n = src.len().min(dst.len());
555    for i in 0..n {
556        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
557    }
558    n
559}
560
561/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
562trait CastToF64 {
563    fn cast_to_f64(self) -> f64;
564}
565
566impl CastToF64 for i8 {
567    fn cast_to_f64(self) -> f64 {
568        self as f64
569    }
570}
571impl CastToF64 for u8 {
572    fn cast_to_f64(self) -> f64 {
573        self as f64
574    }
575}
576impl CastToF64 for i16 {
577    fn cast_to_f64(self) -> f64 {
578        self as f64
579    }
580}
581impl CastToF64 for u16 {
582    fn cast_to_f64(self) -> f64 {
583        self as f64
584    }
585}
586impl CastToF64 for i32 {
587    fn cast_to_f64(self) -> f64 {
588        self as f64
589    }
590}
591impl CastToF64 for u32 {
592    fn cast_to_f64(self) -> f64 {
593        self as f64
594    }
595}
596impl CastToF64 for i64 {
597    fn cast_to_f64(self) -> f64 {
598        self as f64
599    }
600}
601impl CastToF64 for u64 {
602    fn cast_to_f64(self) -> f64 {
603        self as f64
604    }
605}
606impl CastToF64 for f32 {
607    fn cast_to_f64(self) -> f64 {
608        self as f64
609    }
610}
611impl CastToF64 for f64 {
612    fn cast_to_f64(self) -> f64 {
613        self
614    }
615}
616
617/// Helper trait for `as` casts from f64.
618trait CastFromF64 {
619    fn cast_from_f64(v: f64) -> Self;
620}
621
622impl CastFromF64 for i8 {
623    fn cast_from_f64(v: f64) -> Self {
624        v as i8
625    }
626}
627impl CastFromF64 for i16 {
628    fn cast_from_f64(v: f64) -> Self {
629        v as i16
630    }
631}
632impl CastFromF64 for i32 {
633    fn cast_from_f64(v: f64) -> Self {
634        v as i32
635    }
636}
637impl CastFromF64 for f32 {
638    fn cast_from_f64(v: f64) -> Self {
639        v as f32
640    }
641}
642impl CastFromF64 for f64 {
643    fn cast_from_f64(v: f64) -> Self {
644        v
645    }
646}
647
648/// Copy NDArray data into the output buffer with type conversion.
649/// Returns the number of elements copied, or 0 if no data is available.
650macro_rules! impl_read_array {
651    ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
652        use crate::ndarray::NDDataBuffer;
653        let handle = match &$self.array_data {
654            Some(h) => h,
655            None => return Ok(0),
656        };
657        let guard = handle.lock();
658        let array = match &*guard {
659            Some(a) => a,
660            None => return Ok(0),
661        };
662        let n = match &array.data {
663            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
664            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
665        };
666        Ok(n)
667    }};
668}
669
670impl PortDriver for PluginPortDriver {
671    fn base(&self) -> &PortDriverBase {
672        &self.base
673    }
674
675    fn base_mut(&mut self) -> &mut PortDriverBase {
676        &mut self.base
677    }
678
679    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
680        let reason = user.reason;
681        let addr = user.addr;
682        self.base.set_int32_param(reason, addr, value)?;
683        self.base.call_param_callbacks(addr)?;
684        let _ = self
685            .param_change_tx
686            .try_send((reason, addr, ParamChangeValue::Int32(value)));
687        Ok(())
688    }
689
690    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
691        let reason = user.reason;
692        let addr = user.addr;
693        self.base.set_float64_param(reason, addr, value)?;
694        self.base.call_param_callbacks(addr)?;
695        let _ = self
696            .param_change_tx
697            .try_send((reason, addr, ParamChangeValue::Float64(value)));
698        Ok(())
699    }
700
701    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
702        let reason = user.reason;
703        let addr = user.addr;
704        let s = String::from_utf8_lossy(data).into_owned();
705        self.base.set_string_param(reason, addr, s.clone())?;
706        self.base.call_param_callbacks(addr)?;
707        let _ = self
708            .param_change_tx
709            .try_send((reason, addr, ParamChangeValue::Octet(s)));
710        Ok(())
711    }
712
713    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
714        impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
715    }
716
717    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
718        impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
719    }
720
721    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
722        impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
723    }
724
725    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
726        impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
727    }
728
729    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
730        impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
731    }
732}
733
734/// Handle to a running plugin runtime. Provides access to sender and port handle.
735#[derive(Clone)]
736pub struct PluginRuntimeHandle {
737    port_runtime: PortRuntimeHandle,
738    array_sender: NDArraySender,
739    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
740    port_name: String,
741    pub ndarray_params: NDArrayDriverParams,
742    pub plugin_params: PluginBaseParams,
743}
744
745impl PluginRuntimeHandle {
746    pub fn port_runtime(&self) -> &PortRuntimeHandle {
747        &self.port_runtime
748    }
749
750    pub fn array_sender(&self) -> &NDArraySender {
751        &self.array_sender
752    }
753
754    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
755        &self.array_output
756    }
757
758    pub fn port_name(&self) -> &str {
759        &self.port_name
760    }
761}
762
763/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
764///
765/// Returns:
766/// - `PluginRuntimeHandle` for wiring and control
767/// - `PortRuntimeHandle` for param I/O
768/// - `JoinHandle` for the data processing thread
769pub fn create_plugin_runtime<P: NDPluginProcess>(
770    port_name: &str,
771    processor: P,
772    pool: Arc<NDArrayPool>,
773    queue_size: usize,
774    ndarray_port: &str,
775    wiring: Arc<WiringRegistry>,
776) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
777    create_plugin_runtime_multi_addr(
778        port_name,
779        processor,
780        pool,
781        queue_size,
782        ndarray_port,
783        wiring,
784        1,
785    )
786}
787
788/// Create a plugin runtime with multi-addr support.
789///
790/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
791pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
792    port_name: &str,
793    mut processor: P,
794    pool: Arc<NDArrayPool>,
795    queue_size: usize,
796    ndarray_port: &str,
797    wiring: Arc<WiringRegistry>,
798    max_addr: usize,
799) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
800    // Param change channel (control plane -> data plane)
801    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
802
803    // Capture plugin type and array data handle before mutable borrow
804    let plugin_type_name = processor.plugin_type().to_string();
805    let array_data = processor.array_data_handle();
806
807    // Create the port driver for control plane
808    let driver = PluginPortDriver::new(
809        port_name,
810        &plugin_type_name,
811        queue_size,
812        ndarray_port,
813        max_addr,
814        param_tx,
815        &mut processor,
816        array_data,
817    )
818    .expect("failed to create plugin port driver");
819
820    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
821    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
822    let ndarray_params = driver.ndarray_params;
823    let plugin_params = driver.plugin_params;
824    let std_array_data_param = driver.std_array_data_param;
825
826    // Create port runtime (actor thread for param I/O)
827    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
828
829    // Clone port handle for the data thread to write params back
830    let port_handle = port_runtime.port_handle().clone();
831
832    // Array channel (data plane)
833    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
834
835    // Shared mode flags
836    let enabled = Arc::new(AtomicBool::new(false));
837    let blocking_mode = Arc::new(AtomicBool::new(false));
838
839    // Shared processor (accessible from both data thread and caller thread)
840    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
841    let array_output_for_handle = array_output.clone();
842    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
843        processor,
844        output: array_output,
845        pool,
846        ndarray_params,
847        plugin_params,
848        port_handle,
849        array_counter: 0,
850        std_array_data_param,
851    }));
852
853    // Type-erased handle for blocking mode
854    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
855        inner: shared.clone(),
856    });
857
858    let data_enabled = enabled.clone();
859    let data_blocking = blocking_mode.clone();
860    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
861
862    // Capture wiring info for data loop
863    let nd_array_port_reason = plugin_params.nd_array_port;
864    let sender_port_name = port_name.to_string();
865    let initial_upstream = ndarray_port.to_string();
866
867    // Spawn data processing thread
868    let data_jh = thread::Builder::new()
869        .name(format!("plugin-data-{port_name}"))
870        .spawn(move || {
871            plugin_data_loop(
872                shared,
873                array_rx,
874                param_rx,
875                enable_callbacks_reason,
876                blocking_callbacks_reason,
877                data_enabled,
878                data_blocking,
879                nd_array_port_reason,
880                sender_port_name,
881                initial_upstream,
882                wiring,
883            );
884        })
885        .expect("failed to spawn plugin data thread");
886
887    let handle = PluginRuntimeHandle {
888        port_runtime,
889        array_sender,
890        array_output: array_output_for_handle,
891        port_name: port_name.to_string(),
892        ndarray_params,
893        plugin_params,
894    };
895
896    (handle, data_jh)
897}
898
899fn plugin_data_loop<P: NDPluginProcess>(
900    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
901    mut array_rx: NDArrayReceiver,
902    mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
903    enable_callbacks_reason: usize,
904    blocking_callbacks_reason: usize,
905    enabled: Arc<AtomicBool>,
906    blocking_mode: Arc<AtomicBool>,
907    nd_array_port_reason: usize,
908    sender_port_name: String,
909    initial_upstream: String,
910    wiring: Arc<WiringRegistry>,
911) {
912    let mut current_upstream = initial_upstream;
913    let rt = tokio::runtime::Builder::new_current_thread()
914        .enable_all()
915        .build()
916        .unwrap();
917    rt.block_on(async {
918        loop {
919            tokio::select! {
920                msg = array_rx.recv_msg() => {
921                    match msg {
922                        Some(msg) => {
923                            // In blocking mode, arrays are processed inline by the caller.
924                            // Skip processing here to avoid double-processing.
925                            if !blocking_mode.load(Ordering::Acquire) {
926                                shared.lock().process_and_publish(&msg.array);
927                            }
928                            // msg dropped here → completion signaled (if tracked)
929                        }
930                        None => break,
931                    }
932                }
933                param = param_rx.recv() => {
934                    match param {
935                        Some((reason, addr, value)) => {
936                            if reason == enable_callbacks_reason {
937                                enabled.store(value.as_i32() != 0, Ordering::Release);
938                            }
939                            if reason == blocking_callbacks_reason {
940                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
941                            }
942                            // Handle NDArrayPort rewiring
943                            if reason == nd_array_port_reason {
944                                if let Some(new_port) = value.as_string() {
945                                    if new_port != current_upstream {
946                                        let old = std::mem::replace(&mut current_upstream, new_port.to_string());
947                                        if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
948                                            eprintln!("NDArrayPort rewire failed: {e}");
949                                            current_upstream = old;
950                                        }
951                                    }
952                                }
953                            }
954                            let snapshot = PluginParamSnapshot {
955                                enable_callbacks: enabled.load(Ordering::Acquire),
956                                reason,
957                                addr,
958                                value,
959                            };
960                            let mut guard = shared.lock();
961                            let t0 = std::time::Instant::now();
962                            let result = guard.processor.on_param_change(reason, &snapshot);
963                            let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
964                            if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
965                                guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
966                            }
967                            drop(guard);
968                        }
969                        None => break,
970                    }
971                }
972            }
973        }
974    });
975}
976
977/// Connect a downstream plugin's sender to a plugin runtime's output.
978pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
979    upstream.array_output().lock().add(downstream_sender);
980}
981
982/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
983pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
984    port_name: &str,
985    mut processor: P,
986    pool: Arc<NDArrayPool>,
987    queue_size: usize,
988    output: NDArrayOutput,
989    ndarray_port: &str,
990    wiring: Arc<WiringRegistry>,
991) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
992    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
993
994    let plugin_type_name = processor.plugin_type().to_string();
995    let array_data = processor.array_data_handle();
996    let driver = PluginPortDriver::new(
997        port_name,
998        &plugin_type_name,
999        queue_size,
1000        ndarray_port,
1001        1,
1002        param_tx,
1003        &mut processor,
1004        array_data,
1005    )
1006    .expect("failed to create plugin port driver");
1007
1008    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1009    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1010    let ndarray_params = driver.ndarray_params;
1011    let plugin_params = driver.plugin_params;
1012    let std_array_data_param = driver.std_array_data_param;
1013
1014    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1015
1016    let port_handle = port_runtime.port_handle().clone();
1017
1018    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1019
1020    let enabled = Arc::new(AtomicBool::new(false));
1021    let blocking_mode = Arc::new(AtomicBool::new(false));
1022
1023    let array_output = Arc::new(parking_lot::Mutex::new(output));
1024    let array_output_for_handle = array_output.clone();
1025    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1026        processor,
1027        output: array_output,
1028        pool,
1029        ndarray_params,
1030        plugin_params,
1031        port_handle,
1032        array_counter: 0,
1033        std_array_data_param,
1034    }));
1035
1036    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1037        inner: shared.clone(),
1038    });
1039
1040    let data_enabled = enabled.clone();
1041    let data_blocking = blocking_mode.clone();
1042    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1043
1044    // Capture wiring info for data loop
1045    let nd_array_port_reason = plugin_params.nd_array_port;
1046    let sender_port_name = port_name.to_string();
1047    let initial_upstream = ndarray_port.to_string();
1048
1049    let data_jh = thread::Builder::new()
1050        .name(format!("plugin-data-{port_name}"))
1051        .spawn(move || {
1052            plugin_data_loop(
1053                shared,
1054                array_rx,
1055                param_rx,
1056                enable_callbacks_reason,
1057                blocking_callbacks_reason,
1058                data_enabled,
1059                data_blocking,
1060                nd_array_port_reason,
1061                sender_port_name,
1062                initial_upstream,
1063                wiring,
1064            );
1065        })
1066        .expect("failed to spawn plugin data thread");
1067
1068    let handle = PluginRuntimeHandle {
1069        port_runtime,
1070        array_sender,
1071        array_output: array_output_for_handle,
1072        port_name: port_name.to_string(),
1073        ndarray_params,
1074        plugin_params,
1075    };
1076
1077    (handle, data_jh)
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use super::*;
1083    use crate::ndarray::{NDDataType, NDDimension};
1084    use crate::plugin::channel::ndarray_channel;
1085
1086    /// Passthrough processor: returns the input array as-is.
1087    struct PassthroughProcessor;
1088
1089    impl NDPluginProcess for PassthroughProcessor {
1090        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1091            ProcessResult::arrays(vec![Arc::new(array.clone())])
1092        }
1093        fn plugin_type(&self) -> &str {
1094            "Passthrough"
1095        }
1096    }
1097
1098    /// Sink processor: consumes arrays, returns nothing.
1099    struct SinkProcessor {
1100        count: usize,
1101    }
1102
1103    impl NDPluginProcess for SinkProcessor {
1104        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1105            self.count += 1;
1106            ProcessResult::empty()
1107        }
1108        fn plugin_type(&self) -> &str {
1109            "Sink"
1110        }
1111    }
1112
1113    fn make_test_array(id: i32) -> Arc<NDArray> {
1114        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1115        arr.unique_id = id;
1116        Arc::new(arr)
1117    }
1118
1119    fn test_wiring() -> Arc<WiringRegistry> {
1120        Arc::new(WiringRegistry::new())
1121    }
1122
1123    /// Enable callbacks on a plugin handle (plugins default to disabled).
1124    fn enable_callbacks(handle: &PluginRuntimeHandle) {
1125        handle
1126            .port_runtime()
1127            .port_handle()
1128            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1129            .unwrap();
1130        std::thread::sleep(std::time::Duration::from_millis(10));
1131    }
1132
1133    #[test]
1134    fn test_passthrough_runtime() {
1135        let pool = Arc::new(NDArrayPool::new(1_000_000));
1136
1137        // Create downstream receiver
1138        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1139        let mut output = NDArrayOutput::new();
1140        output.add(downstream_sender);
1141
1142        let (handle, _data_jh) = create_plugin_runtime_with_output(
1143            "PASS1",
1144            PassthroughProcessor,
1145            pool,
1146            10,
1147            output,
1148            "",
1149            test_wiring(),
1150        );
1151        enable_callbacks(&handle);
1152
1153        // Send an array
1154        handle.array_sender().send(make_test_array(42));
1155
1156        // Should come out the other side
1157        let received = downstream_rx.blocking_recv().unwrap();
1158        assert_eq!(received.unique_id, 42);
1159    }
1160
1161    #[test]
1162    fn test_sink_runtime() {
1163        let pool = Arc::new(NDArrayPool::new(1_000_000));
1164
1165        let (handle, _data_jh) = create_plugin_runtime(
1166            "SINK1",
1167            SinkProcessor { count: 0 },
1168            pool,
1169            10,
1170            "",
1171            test_wiring(),
1172        );
1173        enable_callbacks(&handle);
1174
1175        // Send arrays - they should be consumed silently
1176        handle.array_sender().send(make_test_array(1));
1177        handle.array_sender().send(make_test_array(2));
1178
1179        // Give processing thread time
1180        std::thread::sleep(std::time::Duration::from_millis(50));
1181
1182        // No crash, no output needed
1183        assert_eq!(handle.port_name(), "SINK1");
1184    }
1185
1186    #[test]
1187    fn test_plugin_type_param() {
1188        let pool = Arc::new(NDArrayPool::new(1_000_000));
1189
1190        let (handle, _data_jh) = create_plugin_runtime(
1191            "TYPE_TEST",
1192            PassthroughProcessor,
1193            pool,
1194            10,
1195            "",
1196            test_wiring(),
1197        );
1198
1199        // Verify port name
1200        assert_eq!(handle.port_name(), "TYPE_TEST");
1201        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1202    }
1203
1204    #[test]
1205    fn test_shutdown_on_handle_drop() {
1206        let pool = Arc::new(NDArrayPool::new(1_000_000));
1207
1208        let (handle, data_jh) = create_plugin_runtime(
1209            "SHUTDOWN_TEST",
1210            PassthroughProcessor,
1211            pool,
1212            10,
1213            "",
1214            test_wiring(),
1215        );
1216
1217        // Drop the handle (closes sender channel, which should cause data thread to exit)
1218        let sender = handle.array_sender().clone();
1219        drop(handle);
1220        drop(sender);
1221
1222        // Data thread should terminate
1223        let result = data_jh.join();
1224        assert!(result.is_ok());
1225    }
1226
1227    #[test]
1228    fn test_dropped_count_when_queue_full() {
1229        let pool = Arc::new(NDArrayPool::new(1_000_000));
1230
1231        // Very slow processor
1232        struct SlowProcessor;
1233        impl NDPluginProcess for SlowProcessor {
1234            fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1235                std::thread::sleep(std::time::Duration::from_millis(100));
1236                ProcessResult::empty()
1237            }
1238            fn plugin_type(&self) -> &str {
1239                "Slow"
1240            }
1241        }
1242
1243        let (handle, _data_jh) =
1244            create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1245        enable_callbacks(&handle);
1246
1247        // Fill the queue and overflow
1248        for i in 0..10 {
1249            handle.array_sender().send(make_test_array(i));
1250        }
1251
1252        // Some should have been dropped
1253        assert!(handle.array_sender().dropped_count() > 0);
1254    }
1255
1256    #[test]
1257    fn test_blocking_callbacks_basic() {
1258        let pool = Arc::new(NDArrayPool::new(1_000_000));
1259        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1260        let mut output = NDArrayOutput::new();
1261        output.add(downstream_sender);
1262
1263        let (handle, _data_jh) = create_plugin_runtime_with_output(
1264            "BLOCK_TEST",
1265            PassthroughProcessor,
1266            pool,
1267            10,
1268            output,
1269            "",
1270            test_wiring(),
1271        );
1272        enable_callbacks(&handle);
1273
1274        // Enable blocking mode
1275        handle
1276            .port_runtime()
1277            .port_handle()
1278            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1279            .unwrap();
1280        std::thread::sleep(std::time::Duration::from_millis(50));
1281
1282        // In blocking mode, send() processes inline and returns synchronously
1283        handle.array_sender().send(make_test_array(42));
1284
1285        // Array should already be in the downstream channel
1286        let received = downstream_rx.blocking_recv().unwrap();
1287        assert_eq!(received.unique_id, 42);
1288    }
1289
1290    #[test]
1291    fn test_blocking_to_nonblocking_switch() {
1292        let pool = Arc::new(NDArrayPool::new(1_000_000));
1293        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1294        let mut output = NDArrayOutput::new();
1295        output.add(downstream_sender);
1296
1297        let (handle, _data_jh) = create_plugin_runtime_with_output(
1298            "SWITCH_TEST",
1299            PassthroughProcessor,
1300            pool,
1301            10,
1302            output,
1303            "",
1304            test_wiring(),
1305        );
1306        enable_callbacks(&handle);
1307
1308        // Start in blocking mode
1309        handle
1310            .port_runtime()
1311            .port_handle()
1312            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1313            .unwrap();
1314        std::thread::sleep(std::time::Duration::from_millis(50));
1315
1316        handle.array_sender().send(make_test_array(1));
1317        let received = downstream_rx.blocking_recv().unwrap();
1318        assert_eq!(received.unique_id, 1);
1319
1320        // Switch back to non-blocking
1321        handle
1322            .port_runtime()
1323            .port_handle()
1324            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1325            .unwrap();
1326        std::thread::sleep(std::time::Duration::from_millis(50));
1327
1328        // Send in non-blocking mode — goes through channel to data thread
1329        handle.array_sender().send(make_test_array(2));
1330        let received = downstream_rx.blocking_recv().unwrap();
1331        assert_eq!(received.unique_id, 2);
1332    }
1333
1334    #[test]
1335    fn test_enable_callbacks_disables_processing() {
1336        let pool = Arc::new(NDArrayPool::new(1_000_000));
1337        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1338        let mut output = NDArrayOutput::new();
1339        output.add(downstream_sender);
1340
1341        let (handle, _data_jh) = create_plugin_runtime_with_output(
1342            "ENABLE_TEST",
1343            PassthroughProcessor,
1344            pool,
1345            10,
1346            output,
1347            "",
1348            test_wiring(),
1349        );
1350
1351        // Disable callbacks
1352        handle
1353            .port_runtime()
1354            .port_handle()
1355            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1356            .unwrap();
1357        std::thread::sleep(std::time::Duration::from_millis(50));
1358
1359        // Send array — should be silently dropped by sender
1360        handle.array_sender().send(make_test_array(99));
1361
1362        // Verify nothing received (with timeout)
1363        let rt = tokio::runtime::Builder::new_current_thread()
1364            .enable_all()
1365            .build()
1366            .unwrap();
1367        let result = rt.block_on(async {
1368            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1369        });
1370        assert!(
1371            result.is_err(),
1372            "should not receive array when callbacks disabled"
1373        );
1374    }
1375
1376    #[test]
1377    fn test_blocking_downstream_receives() {
1378        let pool = Arc::new(NDArrayPool::new(1_000_000));
1379
1380        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1381        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1382        let mut output = NDArrayOutput::new();
1383        output.add(ds1);
1384        output.add(ds2);
1385
1386        let (handle, _data_jh) = create_plugin_runtime_with_output(
1387            "BLOCK_DS_TEST",
1388            PassthroughProcessor,
1389            pool,
1390            10,
1391            output,
1392            "",
1393            test_wiring(),
1394        );
1395        enable_callbacks(&handle);
1396
1397        // Enable blocking mode
1398        handle
1399            .port_runtime()
1400            .port_handle()
1401            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1402            .unwrap();
1403        std::thread::sleep(std::time::Duration::from_millis(50));
1404
1405        handle.array_sender().send(make_test_array(77));
1406
1407        // Both downstream receivers should have the array
1408        let r1 = rx1.blocking_recv().unwrap();
1409        let r2 = rx2.blocking_recv().unwrap();
1410        assert_eq!(r1.unique_id, 77);
1411        assert_eq!(r2.unique_id, 77);
1412    }
1413
1414    #[test]
1415    fn test_blocking_param_updates() {
1416        let pool = Arc::new(NDArrayPool::new(1_000_000));
1417
1418        struct ParamTracker;
1419        impl NDPluginProcess for ParamTracker {
1420            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1421                ProcessResult::arrays(vec![Arc::new(array.clone())])
1422            }
1423            fn plugin_type(&self) -> &str {
1424                "ParamTracker"
1425            }
1426        }
1427
1428        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1429        let mut output = NDArrayOutput::new();
1430        output.add(downstream_sender);
1431
1432        let (handle, _data_jh) = create_plugin_runtime_with_output(
1433            "PARAM_TEST",
1434            ParamTracker,
1435            pool,
1436            10,
1437            output,
1438            "",
1439            test_wiring(),
1440        );
1441        enable_callbacks(&handle);
1442
1443        // Enable blocking mode
1444        handle
1445            .port_runtime()
1446            .port_handle()
1447            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1448            .unwrap();
1449        std::thread::sleep(std::time::Duration::from_millis(50));
1450
1451        // Send array in blocking mode
1452        handle.array_sender().send(make_test_array(1));
1453        let received = downstream_rx.blocking_recv().unwrap();
1454        assert_eq!(received.unique_id, 1);
1455
1456        // Write enable_callbacks while in blocking mode — should not crash
1457        handle
1458            .port_runtime()
1459            .port_handle()
1460            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1461            .unwrap();
1462        std::thread::sleep(std::time::Duration::from_millis(50));
1463
1464        // Still works after param update
1465        handle.array_sender().send(make_test_array(2));
1466        let received = downstream_rx.blocking_recv().unwrap();
1467        assert_eq!(received.unique_id, 2);
1468    }
1469
1470    /// Phase 0 regression test: process_and_publish inside a current-thread runtime must not panic.
1471    #[test]
1472    fn test_no_panic_in_current_thread_runtime() {
1473        let pool = Arc::new(NDArrayPool::new(1_000_000));
1474        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1475        let mut output = NDArrayOutput::new();
1476        output.add(downstream_sender);
1477
1478        let (handle, _data_jh) = create_plugin_runtime_with_output(
1479            "CURRENT_THREAD_TEST",
1480            PassthroughProcessor,
1481            pool,
1482            10,
1483            output,
1484            "",
1485            test_wiring(),
1486        );
1487        enable_callbacks(&handle);
1488
1489        // Enable blocking mode so process_and_publish runs inline
1490        handle
1491            .port_runtime()
1492            .port_handle()
1493            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1494            .unwrap();
1495        std::thread::sleep(std::time::Duration::from_millis(50));
1496
1497        // Call send (which calls process_and_publish inline) from inside a current-thread runtime
1498        let rt = tokio::runtime::Builder::new_current_thread()
1499            .enable_all()
1500            .build()
1501            .unwrap();
1502        rt.block_on(async {
1503            handle.array_sender().send(make_test_array(99));
1504        });
1505
1506        let received = downstream_rx.blocking_recv().unwrap();
1507        assert_eq!(received.unique_id, 99);
1508    }
1509}