Skip to main content

ad_core_rs/plugin/
runtime.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread;
4
5use asyn_rs::error::AsynResult;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::runtime::config::RuntimeConfig;
8use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
9use asyn_rs::user::AsynUser;
10
11use asyn_rs::port_handle::PortHandle;
12
13use crate::ndarray::NDArray;
14use crate::ndarray_pool::NDArrayPool;
15use crate::params::ndarray_driver::NDArrayDriverParams;
16
17use super::channel::{
18    BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
19};
20use super::params::PluginBaseParams;
21use super::wiring::WiringRegistry;
22
23/// Value sent through the param change channel from control plane to data plane.
24#[derive(Debug, Clone)]
25pub enum ParamChangeValue {
26    Int32(i32),
27    Float64(f64),
28    Octet(String),
29}
30
31impl ParamChangeValue {
32    pub fn as_i32(&self) -> i32 {
33        match self {
34            ParamChangeValue::Int32(v) => *v,
35            ParamChangeValue::Float64(v) => *v as i32,
36            ParamChangeValue::Octet(_) => 0,
37        }
38    }
39
40    pub fn as_f64(&self) -> f64 {
41        match self {
42            ParamChangeValue::Int32(v) => *v as f64,
43            ParamChangeValue::Float64(v) => *v,
44            ParamChangeValue::Octet(_) => 0.0,
45        }
46    }
47
48    pub fn as_string(&self) -> Option<&str> {
49        match self {
50            ParamChangeValue::Octet(s) => Some(s),
51            _ => None,
52        }
53    }
54}
55
56/// A single parameter update produced by a plugin's process_array.
57pub enum ParamUpdate {
58    Int32 {
59        reason: usize,
60        addr: i32,
61        value: i32,
62    },
63    Float64 {
64        reason: usize,
65        addr: i32,
66        value: f64,
67    },
68    Octet {
69        reason: usize,
70        addr: i32,
71        value: String,
72    },
73}
74
75impl ParamUpdate {
76    /// Create an Int32 update at addr 0.
77    pub fn int32(reason: usize, value: i32) -> Self {
78        Self::Int32 {
79            reason,
80            addr: 0,
81            value,
82        }
83    }
84    /// Create a Float64 update at addr 0.
85    pub fn float64(reason: usize, value: f64) -> Self {
86        Self::Float64 {
87            reason,
88            addr: 0,
89            value,
90        }
91    }
92    /// Create an Int32 update at a specific addr.
93    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
94        Self::Int32 {
95            reason,
96            addr,
97            value,
98        }
99    }
100    /// Create a Float64 update at a specific addr.
101    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
102        Self::Float64 {
103            reason,
104            addr,
105            value,
106        }
107    }
108}
109
110/// Result of processing one array: output arrays + param updates to write back.
111pub struct ProcessResult {
112    pub output_arrays: Vec<Arc<NDArray>>,
113    pub param_updates: Vec<ParamUpdate>,
114    /// If set, only publish to the subscriber at this index (round-robin scatter).
115    pub scatter_index: Option<usize>,
116}
117
118impl ProcessResult {
119    /// Convenience: sink plugin with only param updates, no output arrays.
120    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
121        Self {
122            output_arrays: vec![],
123            param_updates,
124            scatter_index: None,
125        }
126    }
127
128    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
129    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
130        Self {
131            output_arrays,
132            param_updates: vec![],
133            scatter_index: None,
134        }
135    }
136
137    /// Convenience: no outputs, no param updates.
138    pub fn empty() -> Self {
139        Self {
140            output_arrays: vec![],
141            param_updates: vec![],
142            scatter_index: None,
143        }
144    }
145
146    /// Convenience: scatter output — send to a single subscriber by index.
147    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
148        Self {
149            output_arrays,
150            param_updates: vec![],
151            scatter_index: Some(index),
152        }
153    }
154}
155
156/// Result of handling a control-plane param change.
157pub struct ParamChangeResult {
158    pub output_arrays: Vec<Arc<NDArray>>,
159    pub param_updates: Vec<ParamUpdate>,
160}
161
162impl ParamChangeResult {
163    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
164        Self {
165            output_arrays: vec![],
166            param_updates,
167        }
168    }
169
170    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
171        Self {
172            output_arrays,
173            param_updates: vec![],
174        }
175    }
176
177    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
178        Self {
179            output_arrays,
180            param_updates,
181        }
182    }
183
184    pub fn empty() -> Self {
185        Self {
186            output_arrays: vec![],
187            param_updates: vec![],
188        }
189    }
190}
191
192/// Pure processing logic. No threading concerns.
193pub trait NDPluginProcess: Send + 'static {
194    /// Process one array. Return output arrays and param updates.
195    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
196
197    /// Plugin type name for PLUGIN_TYPE param.
198    fn plugin_type(&self) -> &str;
199
200    /// Register plugin-specific params on the base. Called once during construction.
201    fn register_params(
202        &mut self,
203        _base: &mut PortDriverBase,
204    ) -> Result<(), asyn_rs::error::AsynError> {
205        Ok(())
206    }
207
208    /// Called when a param changes. Reason is the param index.
209    /// Return param updates to be written back to the port driver.
210    fn on_param_change(
211        &mut self,
212        _reason: usize,
213        _params: &PluginParamSnapshot,
214    ) -> ParamChangeResult {
215        ParamChangeResult::empty()
216    }
217
218    /// Return a handle to the latest NDArray data for array reads.
219    /// Override this in plugins like NDPluginStdArrays that serve pixel data
220    /// via readInt8Array/readInt16Array/etc.
221    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
222        None
223    }
224}
225
226/// Read-only snapshot of param values available to the processing thread.
227pub struct PluginParamSnapshot {
228    pub enable_callbacks: bool,
229    /// The param reason that changed.
230    pub reason: usize,
231    /// The address (sub-device) that changed.
232    pub addr: i32,
233    /// The new value.
234    pub value: ParamChangeValue,
235}
236
237/// Shared processor state protected by a mutex, accessible from both
238/// the data thread (non-blocking mode) and the caller thread (blocking mode).
239struct SharedProcessorInner<P: NDPluginProcess> {
240    processor: P,
241    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
242    pool: Arc<NDArrayPool>,
243    ndarray_params: NDArrayDriverParams,
244    plugin_params: PluginBaseParams,
245    port_handle: PortHandle,
246    array_counter: i32,
247    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
248    std_array_data_param: Option<usize>,
249}
250
251impl<P: NDPluginProcess> SharedProcessorInner<P> {
252    fn process_and_publish(&mut self, array: &NDArray) {
253        let t0 = std::time::Instant::now();
254        let result = self.processor.process_array(array, &self.pool);
255        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
256        self.publish_result(
257            result.output_arrays,
258            result.param_updates,
259            result.scatter_index,
260            Some(array),
261            elapsed_ms,
262        );
263    }
264
265    fn publish_result(
266        &mut self,
267        output_arrays: Vec<Arc<NDArray>>,
268        param_updates: Vec<ParamUpdate>,
269        scatter_index: Option<usize>,
270        fallback_array: Option<&NDArray>,
271        elapsed_ms: f64,
272    ) {
273        let output = self.output.lock();
274        for out in &output_arrays {
275            if let Some(idx) = scatter_index {
276                output.publish_to(idx, out.clone());
277            } else {
278                output.publish(out.clone());
279            }
280        }
281        drop(output);
282
283        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
284            self.array_counter += 1;
285
286            // Fire array data interrupt directly (C EPICS pattern).
287            // Bypasses port actor channel to avoid dropping large array messages.
288            if let Some(param) = self.std_array_data_param {
289                use crate::ndarray::NDDataBuffer;
290                use asyn_rs::param::ParamValue;
291                let value = match &report_arr.data {
292                    NDDataBuffer::I8(v) => {
293                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
294                    }
295                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
296                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
297                    ))),
298                    NDDataBuffer::I16(v) => {
299                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
300                    }
301                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
302                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
303                    ))),
304                    NDDataBuffer::I32(v) => {
305                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
306                    }
307                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
308                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
309                    ))),
310                    NDDataBuffer::I64(v) => {
311                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
312                    }
313                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
314                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
315                    ))),
316                    NDDataBuffer::F32(v) => {
317                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
318                    }
319                    NDDataBuffer::F64(v) => {
320                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
321                    }
322                };
323                if let Some(value) = value {
324                    let ts = report_arr.timestamp.to_system_time();
325                    self.port_handle
326                        .interrupts()
327                        .notify(asyn_rs::interrupt::InterruptValue {
328                            reason: param,
329                            addr: 0,
330                            value,
331                            timestamp: ts,
332                        });
333                }
334            }
335
336            let info = report_arr.info();
337            let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
338            self.port_handle.write_int32_no_wait(
339                self.ndarray_params.array_counter,
340                0,
341                self.array_counter,
342            );
343            self.port_handle.write_int32_no_wait(
344                self.ndarray_params.unique_id,
345                0,
346                report_arr.unique_id,
347            );
348            self.port_handle.write_int32_no_wait(
349                self.ndarray_params.n_dimensions,
350                0,
351                report_arr.dims.len() as i32,
352            );
353            self.port_handle.write_int32_no_wait(
354                self.ndarray_params.array_size_x,
355                0,
356                info.x_size as i32,
357            );
358            self.port_handle.write_int32_no_wait(
359                self.ndarray_params.array_size_y,
360                0,
361                info.y_size as i32,
362            );
363            self.port_handle.write_int32_no_wait(
364                self.ndarray_params.array_size_z,
365                0,
366                info.color_size as i32,
367            );
368            self.port_handle.write_int32_no_wait(
369                self.ndarray_params.array_size,
370                0,
371                info.total_bytes as i32,
372            );
373            self.port_handle.write_int32_no_wait(
374                self.ndarray_params.data_type,
375                0,
376                report_arr.data.data_type() as i32,
377            );
378            self.port_handle
379                .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
380
381            let ts_f64 = report_arr.timestamp.as_f64();
382            self.port_handle
383                .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
384            self.port_handle.write_int32_no_wait(
385                self.ndarray_params.epics_ts_sec,
386                0,
387                report_arr.timestamp.sec as i32,
388            );
389            self.port_handle.write_int32_no_wait(
390                self.ndarray_params.epics_ts_nsec,
391                0,
392                report_arr.timestamp.nsec as i32,
393            );
394        }
395
396        self.port_handle
397            .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
398
399        // Set params directly and fire callbacks — no writeInt32/on_param_change re-entrancy.
400        // This mirrors C ADCore's setIntegerParam + callParamCallbacks pattern.
401        use asyn_rs::request::ParamSetValue;
402
403        let mut addr0_updates: Vec<ParamSetValue> = Vec::new();
404        let mut extra_addr_map: std::collections::HashMap<i32, Vec<ParamSetValue>> =
405            std::collections::HashMap::new();
406
407        for update in &param_updates {
408            match update {
409                ParamUpdate::Int32 {
410                    reason,
411                    addr,
412                    value,
413                } => {
414                    let pv = ParamSetValue::Int32 {
415                        reason: *reason,
416                        addr: *addr,
417                        value: *value,
418                    };
419                    if *addr == 0 {
420                        addr0_updates.push(pv);
421                    } else {
422                        extra_addr_map.entry(*addr).or_default().push(pv);
423                    }
424                }
425                ParamUpdate::Float64 {
426                    reason,
427                    addr,
428                    value,
429                } => {
430                    let pv = ParamSetValue::Float64 {
431                        reason: *reason,
432                        addr: *addr,
433                        value: *value,
434                    };
435                    if *addr == 0 {
436                        addr0_updates.push(pv);
437                    } else {
438                        extra_addr_map.entry(*addr).or_default().push(pv);
439                    }
440                }
441                ParamUpdate::Octet {
442                    reason,
443                    addr,
444                    value,
445                } => {
446                    let pv = ParamSetValue::Octet {
447                        reason: *reason,
448                        addr: *addr,
449                        value: value.clone(),
450                    };
451                    if *addr == 0 {
452                        addr0_updates.push(pv);
453                    } else {
454                        extra_addr_map.entry(*addr).or_default().push(pv);
455                    }
456                }
457            }
458        }
459
460        self.port_handle.set_params_and_notify(0, addr0_updates);
461        for (addr, updates) in extra_addr_map {
462            self.port_handle.set_params_and_notify(addr, updates);
463        }
464    }
465}
466
467/// Type-erased handle for blocking mode: allows NDArraySender to call
468/// process_and_publish without knowing the concrete processor type.
469struct BlockingProcessorHandle<P: NDPluginProcess> {
470    inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
471}
472
473impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
474    fn process_and_publish(&self, array: &NDArray) {
475        self.inner.lock().process_and_publish(array);
476    }
477}
478
479/// PortDriver implementation for a plugin's control plane.
480#[allow(dead_code)]
481pub struct PluginPortDriver {
482    base: PortDriverBase,
483    ndarray_params: NDArrayDriverParams,
484    plugin_params: PluginBaseParams,
485    param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
486    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
487    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
488    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
489    std_array_data_param: Option<usize>,
490}
491
492impl PluginPortDriver {
493    fn new<P: NDPluginProcess>(
494        port_name: &str,
495        plugin_type_name: &str,
496        queue_size: usize,
497        ndarray_port: &str,
498        max_addr: usize,
499        param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
500        processor: &mut P,
501        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
502    ) -> AsynResult<Self> {
503        let mut base = PortDriverBase::new(
504            port_name,
505            max_addr,
506            PortFlags {
507                can_block: true,
508                ..Default::default()
509            },
510        );
511
512        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
513        let plugin_params = PluginBaseParams::create(&mut base)?;
514
515        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
516        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
517        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
518        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
519        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
520        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
521        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
522        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
523        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
524        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
525        base.set_int32_param(ndarray_params.capture, 0, 0)?;
526        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
527        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
528        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
529        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
530        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
531        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
532        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
533        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
534        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
535        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
536
537        // Set plugin identity params
538        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
539        if !ndarray_port.is_empty() {
540            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
541        }
542
543        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
544        let std_array_data_param = if array_data.is_some() {
545            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
546        } else {
547            None
548        };
549
550        // Let the processor register its plugin-specific params
551        processor.register_params(&mut base)?;
552
553        Ok(Self {
554            base,
555            ndarray_params,
556            plugin_params,
557            param_change_tx,
558            array_data,
559            std_array_data_param,
560        })
561    }
562}
563
564/// Copy source slice directly into destination buffer, returning elements copied.
565fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
566    let n = src.len().min(dst.len());
567    dst[..n].copy_from_slice(&src[..n]);
568    n
569}
570
571/// Convert and copy source slice into destination buffer element-by-element.
572fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
573where
574    S: CastToF64 + Copy,
575    D: CastFromF64 + Copy,
576{
577    let n = src.len().min(dst.len());
578    for i in 0..n {
579        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
580    }
581    n
582}
583
584/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
585trait CastToF64 {
586    fn cast_to_f64(self) -> f64;
587}
588
589impl CastToF64 for i8 {
590    fn cast_to_f64(self) -> f64 {
591        self as f64
592    }
593}
594impl CastToF64 for u8 {
595    fn cast_to_f64(self) -> f64 {
596        self as f64
597    }
598}
599impl CastToF64 for i16 {
600    fn cast_to_f64(self) -> f64 {
601        self as f64
602    }
603}
604impl CastToF64 for u16 {
605    fn cast_to_f64(self) -> f64 {
606        self as f64
607    }
608}
609impl CastToF64 for i32 {
610    fn cast_to_f64(self) -> f64 {
611        self as f64
612    }
613}
614impl CastToF64 for u32 {
615    fn cast_to_f64(self) -> f64 {
616        self as f64
617    }
618}
619impl CastToF64 for i64 {
620    fn cast_to_f64(self) -> f64 {
621        self as f64
622    }
623}
624impl CastToF64 for u64 {
625    fn cast_to_f64(self) -> f64 {
626        self as f64
627    }
628}
629impl CastToF64 for f32 {
630    fn cast_to_f64(self) -> f64 {
631        self as f64
632    }
633}
634impl CastToF64 for f64 {
635    fn cast_to_f64(self) -> f64 {
636        self
637    }
638}
639
640/// Helper trait for `as` casts from f64.
641trait CastFromF64 {
642    fn cast_from_f64(v: f64) -> Self;
643}
644
645impl CastFromF64 for i8 {
646    fn cast_from_f64(v: f64) -> Self {
647        v as i8
648    }
649}
650impl CastFromF64 for i16 {
651    fn cast_from_f64(v: f64) -> Self {
652        v as i16
653    }
654}
655impl CastFromF64 for i32 {
656    fn cast_from_f64(v: f64) -> Self {
657        v as i32
658    }
659}
660impl CastFromF64 for f32 {
661    fn cast_from_f64(v: f64) -> Self {
662        v as f32
663    }
664}
665impl CastFromF64 for f64 {
666    fn cast_from_f64(v: f64) -> Self {
667        v
668    }
669}
670
671/// Copy NDArray data into the output buffer with type conversion.
672/// Returns the number of elements copied, or 0 if no data is available.
673macro_rules! impl_read_array {
674    ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
675        use crate::ndarray::NDDataBuffer;
676        let handle = match &$self.array_data {
677            Some(h) => h,
678            None => return Ok(0),
679        };
680        let guard = handle.lock();
681        let array = match &*guard {
682            Some(a) => a,
683            None => return Ok(0),
684        };
685        let n = match &array.data {
686            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
687            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
688        };
689        Ok(n)
690    }};
691}
692
693impl PortDriver for PluginPortDriver {
694    fn base(&self) -> &PortDriverBase {
695        &self.base
696    }
697
698    fn base_mut(&mut self) -> &mut PortDriverBase {
699        &mut self.base
700    }
701
702    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
703        let reason = user.reason;
704        let addr = user.addr;
705        self.base.set_int32_param(reason, addr, value)?;
706        self.base.call_param_callbacks(addr)?;
707        let _ = self
708            .param_change_tx
709            .try_send((reason, addr, ParamChangeValue::Int32(value)));
710        Ok(())
711    }
712
713    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
714        let reason = user.reason;
715        let addr = user.addr;
716        self.base.set_float64_param(reason, addr, value)?;
717        self.base.call_param_callbacks(addr)?;
718        let _ = self
719            .param_change_tx
720            .try_send((reason, addr, ParamChangeValue::Float64(value)));
721        Ok(())
722    }
723
724    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
725        let reason = user.reason;
726        let addr = user.addr;
727        let s = String::from_utf8_lossy(data).into_owned();
728        self.base.set_string_param(reason, addr, s.clone())?;
729        self.base.call_param_callbacks(addr)?;
730        let _ = self
731            .param_change_tx
732            .try_send((reason, addr, ParamChangeValue::Octet(s)));
733        Ok(())
734    }
735
736    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
737        impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
738    }
739
740    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
741        impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
742    }
743
744    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
745        impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
746    }
747
748    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
749        impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
750    }
751
752    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
753        impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
754    }
755}
756
757/// Handle to a running plugin runtime. Provides access to sender and port handle.
758#[derive(Clone)]
759pub struct PluginRuntimeHandle {
760    port_runtime: PortRuntimeHandle,
761    array_sender: NDArraySender,
762    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
763    port_name: String,
764    pub ndarray_params: NDArrayDriverParams,
765    pub plugin_params: PluginBaseParams,
766}
767
768impl PluginRuntimeHandle {
769    pub fn port_runtime(&self) -> &PortRuntimeHandle {
770        &self.port_runtime
771    }
772
773    pub fn array_sender(&self) -> &NDArraySender {
774        &self.array_sender
775    }
776
777    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
778        &self.array_output
779    }
780
781    pub fn port_name(&self) -> &str {
782        &self.port_name
783    }
784}
785
786/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
787///
788/// Returns:
789/// - `PluginRuntimeHandle` for wiring and control
790/// - `PortRuntimeHandle` for param I/O
791/// - `JoinHandle` for the data processing thread
792pub fn create_plugin_runtime<P: NDPluginProcess>(
793    port_name: &str,
794    processor: P,
795    pool: Arc<NDArrayPool>,
796    queue_size: usize,
797    ndarray_port: &str,
798    wiring: Arc<WiringRegistry>,
799) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
800    create_plugin_runtime_multi_addr(
801        port_name,
802        processor,
803        pool,
804        queue_size,
805        ndarray_port,
806        wiring,
807        1,
808    )
809}
810
811/// Create a plugin runtime with multi-addr support.
812///
813/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
814pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
815    port_name: &str,
816    mut processor: P,
817    pool: Arc<NDArrayPool>,
818    queue_size: usize,
819    ndarray_port: &str,
820    wiring: Arc<WiringRegistry>,
821    max_addr: usize,
822) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
823    // Param change channel (control plane -> data plane)
824    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
825
826    // Capture plugin type and array data handle before mutable borrow
827    let plugin_type_name = processor.plugin_type().to_string();
828    let array_data = processor.array_data_handle();
829
830    // Create the port driver for control plane
831    let driver = PluginPortDriver::new(
832        port_name,
833        &plugin_type_name,
834        queue_size,
835        ndarray_port,
836        max_addr,
837        param_tx,
838        &mut processor,
839        array_data,
840    )
841    .expect("failed to create plugin port driver");
842
843    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
844    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
845    let ndarray_params = driver.ndarray_params;
846    let plugin_params = driver.plugin_params;
847    let std_array_data_param = driver.std_array_data_param;
848
849    // Create port runtime (actor thread for param I/O)
850    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
851
852    // Clone port handle for the data thread to write params back
853    let port_handle = port_runtime.port_handle().clone();
854
855    // Array channel (data plane)
856    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
857
858    // Shared mode flags
859    let enabled = Arc::new(AtomicBool::new(false));
860    let blocking_mode = Arc::new(AtomicBool::new(false));
861
862    // Shared processor (accessible from both data thread and caller thread)
863    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
864    let array_output_for_handle = array_output.clone();
865    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
866        processor,
867        output: array_output,
868        pool,
869        ndarray_params,
870        plugin_params,
871        port_handle,
872        array_counter: 0,
873        std_array_data_param,
874    }));
875
876    // Type-erased handle for blocking mode
877    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
878        inner: shared.clone(),
879    });
880
881    let data_enabled = enabled.clone();
882    let data_blocking = blocking_mode.clone();
883    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
884
885    // Capture wiring info for data loop
886    let nd_array_port_reason = plugin_params.nd_array_port;
887    let sender_port_name = port_name.to_string();
888    let initial_upstream = ndarray_port.to_string();
889
890    // Spawn data processing thread
891    let data_jh = thread::Builder::new()
892        .name(format!("plugin-data-{port_name}"))
893        .spawn(move || {
894            plugin_data_loop(
895                shared,
896                array_rx,
897                param_rx,
898                enable_callbacks_reason,
899                blocking_callbacks_reason,
900                data_enabled,
901                data_blocking,
902                nd_array_port_reason,
903                sender_port_name,
904                initial_upstream,
905                wiring,
906            );
907        })
908        .expect("failed to spawn plugin data thread");
909
910    let handle = PluginRuntimeHandle {
911        port_runtime,
912        array_sender,
913        array_output: array_output_for_handle,
914        port_name: port_name.to_string(),
915        ndarray_params,
916        plugin_params,
917    };
918
919    (handle, data_jh)
920}
921
922fn plugin_data_loop<P: NDPluginProcess>(
923    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
924    mut array_rx: NDArrayReceiver,
925    mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
926    enable_callbacks_reason: usize,
927    blocking_callbacks_reason: usize,
928    enabled: Arc<AtomicBool>,
929    blocking_mode: Arc<AtomicBool>,
930    nd_array_port_reason: usize,
931    sender_port_name: String,
932    initial_upstream: String,
933    wiring: Arc<WiringRegistry>,
934) {
935    let mut current_upstream = initial_upstream;
936    let rt = tokio::runtime::Builder::new_current_thread()
937        .enable_all()
938        .build()
939        .unwrap();
940    rt.block_on(async {
941        loop {
942            tokio::select! {
943                msg = array_rx.recv_msg() => {
944                    match msg {
945                        Some(msg) => {
946                            // In blocking mode, arrays are processed inline by the caller.
947                            // Skip processing here to avoid double-processing.
948                            if !blocking_mode.load(Ordering::Acquire) {
949                                shared.lock().process_and_publish(&msg.array);
950                            }
951                            // msg dropped here → completion signaled (if tracked)
952                        }
953                        None => break,
954                    }
955                }
956                param = param_rx.recv() => {
957                    match param {
958                        Some((reason, addr, value)) => {
959                            if reason == enable_callbacks_reason {
960                                enabled.store(value.as_i32() != 0, Ordering::Release);
961                            }
962                            if reason == blocking_callbacks_reason {
963                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
964                            }
965                            // Handle NDArrayPort rewiring
966                            if reason == nd_array_port_reason {
967                                if let Some(new_port) = value.as_string() {
968                                    if new_port != current_upstream {
969                                        let old = std::mem::replace(&mut current_upstream, new_port.to_string());
970                                        if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
971                                            eprintln!("NDArrayPort rewire failed: {e}");
972                                            current_upstream = old;
973                                        }
974                                    }
975                                }
976                            }
977                            let snapshot = PluginParamSnapshot {
978                                enable_callbacks: enabled.load(Ordering::Acquire),
979                                reason,
980                                addr,
981                                value,
982                            };
983                            let mut guard = shared.lock();
984                            let t0 = std::time::Instant::now();
985                            let result = guard.processor.on_param_change(reason, &snapshot);
986                            let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
987                            if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
988                                guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
989                            }
990                            drop(guard);
991                        }
992                        None => break,
993                    }
994                }
995            }
996        }
997    });
998}
999
1000/// Connect a downstream plugin's sender to a plugin runtime's output.
1001pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1002    upstream.array_output().lock().add(downstream_sender);
1003}
1004
1005/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
1006pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1007    port_name: &str,
1008    mut processor: P,
1009    pool: Arc<NDArrayPool>,
1010    queue_size: usize,
1011    output: NDArrayOutput,
1012    ndarray_port: &str,
1013    wiring: Arc<WiringRegistry>,
1014) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1015    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1016
1017    let plugin_type_name = processor.plugin_type().to_string();
1018    let array_data = processor.array_data_handle();
1019    let driver = PluginPortDriver::new(
1020        port_name,
1021        &plugin_type_name,
1022        queue_size,
1023        ndarray_port,
1024        1,
1025        param_tx,
1026        &mut processor,
1027        array_data,
1028    )
1029    .expect("failed to create plugin port driver");
1030
1031    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1032    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1033    let ndarray_params = driver.ndarray_params;
1034    let plugin_params = driver.plugin_params;
1035    let std_array_data_param = driver.std_array_data_param;
1036
1037    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1038
1039    let port_handle = port_runtime.port_handle().clone();
1040
1041    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1042
1043    let enabled = Arc::new(AtomicBool::new(false));
1044    let blocking_mode = Arc::new(AtomicBool::new(false));
1045
1046    let array_output = Arc::new(parking_lot::Mutex::new(output));
1047    let array_output_for_handle = array_output.clone();
1048    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1049        processor,
1050        output: array_output,
1051        pool,
1052        ndarray_params,
1053        plugin_params,
1054        port_handle,
1055        array_counter: 0,
1056        std_array_data_param,
1057    }));
1058
1059    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1060        inner: shared.clone(),
1061    });
1062
1063    let data_enabled = enabled.clone();
1064    let data_blocking = blocking_mode.clone();
1065    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1066
1067    // Capture wiring info for data loop
1068    let nd_array_port_reason = plugin_params.nd_array_port;
1069    let sender_port_name = port_name.to_string();
1070    let initial_upstream = ndarray_port.to_string();
1071
1072    let data_jh = thread::Builder::new()
1073        .name(format!("plugin-data-{port_name}"))
1074        .spawn(move || {
1075            plugin_data_loop(
1076                shared,
1077                array_rx,
1078                param_rx,
1079                enable_callbacks_reason,
1080                blocking_callbacks_reason,
1081                data_enabled,
1082                data_blocking,
1083                nd_array_port_reason,
1084                sender_port_name,
1085                initial_upstream,
1086                wiring,
1087            );
1088        })
1089        .expect("failed to spawn plugin data thread");
1090
1091    let handle = PluginRuntimeHandle {
1092        port_runtime,
1093        array_sender,
1094        array_output: array_output_for_handle,
1095        port_name: port_name.to_string(),
1096        ndarray_params,
1097        plugin_params,
1098    };
1099
1100    (handle, data_jh)
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105    use super::*;
1106    use crate::ndarray::{NDDataType, NDDimension};
1107    use crate::plugin::channel::ndarray_channel;
1108
1109    /// Passthrough processor: returns the input array as-is.
1110    struct PassthroughProcessor;
1111
1112    impl NDPluginProcess for PassthroughProcessor {
1113        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1114            ProcessResult::arrays(vec![Arc::new(array.clone())])
1115        }
1116        fn plugin_type(&self) -> &str {
1117            "Passthrough"
1118        }
1119    }
1120
1121    /// Sink processor: consumes arrays, returns nothing.
1122    struct SinkProcessor {
1123        count: usize,
1124    }
1125
1126    impl NDPluginProcess for SinkProcessor {
1127        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1128            self.count += 1;
1129            ProcessResult::empty()
1130        }
1131        fn plugin_type(&self) -> &str {
1132            "Sink"
1133        }
1134    }
1135
1136    fn make_test_array(id: i32) -> Arc<NDArray> {
1137        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1138        arr.unique_id = id;
1139        Arc::new(arr)
1140    }
1141
1142    fn test_wiring() -> Arc<WiringRegistry> {
1143        Arc::new(WiringRegistry::new())
1144    }
1145
1146    /// Enable callbacks on a plugin handle (plugins default to disabled).
1147    fn enable_callbacks(handle: &PluginRuntimeHandle) {
1148        handle
1149            .port_runtime()
1150            .port_handle()
1151            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1152            .unwrap();
1153        std::thread::sleep(std::time::Duration::from_millis(10));
1154    }
1155
1156    #[test]
1157    fn test_passthrough_runtime() {
1158        let pool = Arc::new(NDArrayPool::new(1_000_000));
1159
1160        // Create downstream receiver
1161        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1162        let mut output = NDArrayOutput::new();
1163        output.add(downstream_sender);
1164
1165        let (handle, _data_jh) = create_plugin_runtime_with_output(
1166            "PASS1",
1167            PassthroughProcessor,
1168            pool,
1169            10,
1170            output,
1171            "",
1172            test_wiring(),
1173        );
1174        enable_callbacks(&handle);
1175
1176        // Send an array
1177        handle.array_sender().send(make_test_array(42));
1178
1179        // Should come out the other side
1180        let received = downstream_rx.blocking_recv().unwrap();
1181        assert_eq!(received.unique_id, 42);
1182    }
1183
1184    #[test]
1185    fn test_sink_runtime() {
1186        let pool = Arc::new(NDArrayPool::new(1_000_000));
1187
1188        let (handle, _data_jh) = create_plugin_runtime(
1189            "SINK1",
1190            SinkProcessor { count: 0 },
1191            pool,
1192            10,
1193            "",
1194            test_wiring(),
1195        );
1196        enable_callbacks(&handle);
1197
1198        // Send arrays - they should be consumed silently
1199        handle.array_sender().send(make_test_array(1));
1200        handle.array_sender().send(make_test_array(2));
1201
1202        // Give processing thread time
1203        std::thread::sleep(std::time::Duration::from_millis(50));
1204
1205        // No crash, no output needed
1206        assert_eq!(handle.port_name(), "SINK1");
1207    }
1208
1209    #[test]
1210    fn test_plugin_type_param() {
1211        let pool = Arc::new(NDArrayPool::new(1_000_000));
1212
1213        let (handle, _data_jh) = create_plugin_runtime(
1214            "TYPE_TEST",
1215            PassthroughProcessor,
1216            pool,
1217            10,
1218            "",
1219            test_wiring(),
1220        );
1221
1222        // Verify port name
1223        assert_eq!(handle.port_name(), "TYPE_TEST");
1224        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1225    }
1226
1227    #[test]
1228    fn test_shutdown_on_handle_drop() {
1229        let pool = Arc::new(NDArrayPool::new(1_000_000));
1230
1231        let (handle, data_jh) = create_plugin_runtime(
1232            "SHUTDOWN_TEST",
1233            PassthroughProcessor,
1234            pool,
1235            10,
1236            "",
1237            test_wiring(),
1238        );
1239
1240        // Drop the handle (closes sender channel, which should cause data thread to exit)
1241        let sender = handle.array_sender().clone();
1242        drop(handle);
1243        drop(sender);
1244
1245        // Data thread should terminate
1246        let result = data_jh.join();
1247        assert!(result.is_ok());
1248    }
1249
1250    #[test]
1251    fn test_dropped_count_when_queue_full() {
1252        let pool = Arc::new(NDArrayPool::new(1_000_000));
1253
1254        // Very slow processor
1255        struct SlowProcessor;
1256        impl NDPluginProcess for SlowProcessor {
1257            fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1258                std::thread::sleep(std::time::Duration::from_millis(100));
1259                ProcessResult::empty()
1260            }
1261            fn plugin_type(&self) -> &str {
1262                "Slow"
1263            }
1264        }
1265
1266        let (handle, _data_jh) =
1267            create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1268        enable_callbacks(&handle);
1269
1270        // Fill the queue and overflow
1271        for i in 0..10 {
1272            handle.array_sender().send(make_test_array(i));
1273        }
1274
1275        // Some should have been dropped
1276        assert!(handle.array_sender().dropped_count() > 0);
1277    }
1278
1279    #[test]
1280    fn test_blocking_callbacks_basic() {
1281        let pool = Arc::new(NDArrayPool::new(1_000_000));
1282        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1283        let mut output = NDArrayOutput::new();
1284        output.add(downstream_sender);
1285
1286        let (handle, _data_jh) = create_plugin_runtime_with_output(
1287            "BLOCK_TEST",
1288            PassthroughProcessor,
1289            pool,
1290            10,
1291            output,
1292            "",
1293            test_wiring(),
1294        );
1295        enable_callbacks(&handle);
1296
1297        // Enable blocking mode
1298        handle
1299            .port_runtime()
1300            .port_handle()
1301            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1302            .unwrap();
1303        std::thread::sleep(std::time::Duration::from_millis(50));
1304
1305        // In blocking mode, send() processes inline and returns synchronously
1306        handle.array_sender().send(make_test_array(42));
1307
1308        // Array should already be in the downstream channel
1309        let received = downstream_rx.blocking_recv().unwrap();
1310        assert_eq!(received.unique_id, 42);
1311    }
1312
1313    #[test]
1314    fn test_blocking_to_nonblocking_switch() {
1315        let pool = Arc::new(NDArrayPool::new(1_000_000));
1316        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1317        let mut output = NDArrayOutput::new();
1318        output.add(downstream_sender);
1319
1320        let (handle, _data_jh) = create_plugin_runtime_with_output(
1321            "SWITCH_TEST",
1322            PassthroughProcessor,
1323            pool,
1324            10,
1325            output,
1326            "",
1327            test_wiring(),
1328        );
1329        enable_callbacks(&handle);
1330
1331        // Start in blocking mode
1332        handle
1333            .port_runtime()
1334            .port_handle()
1335            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1336            .unwrap();
1337        std::thread::sleep(std::time::Duration::from_millis(50));
1338
1339        handle.array_sender().send(make_test_array(1));
1340        let received = downstream_rx.blocking_recv().unwrap();
1341        assert_eq!(received.unique_id, 1);
1342
1343        // Switch back to non-blocking
1344        handle
1345            .port_runtime()
1346            .port_handle()
1347            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1348            .unwrap();
1349        std::thread::sleep(std::time::Duration::from_millis(50));
1350
1351        // Send in non-blocking mode — goes through channel to data thread
1352        handle.array_sender().send(make_test_array(2));
1353        let received = downstream_rx.blocking_recv().unwrap();
1354        assert_eq!(received.unique_id, 2);
1355    }
1356
1357    #[test]
1358    fn test_enable_callbacks_disables_processing() {
1359        let pool = Arc::new(NDArrayPool::new(1_000_000));
1360        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1361        let mut output = NDArrayOutput::new();
1362        output.add(downstream_sender);
1363
1364        let (handle, _data_jh) = create_plugin_runtime_with_output(
1365            "ENABLE_TEST",
1366            PassthroughProcessor,
1367            pool,
1368            10,
1369            output,
1370            "",
1371            test_wiring(),
1372        );
1373
1374        // Disable callbacks
1375        handle
1376            .port_runtime()
1377            .port_handle()
1378            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1379            .unwrap();
1380        std::thread::sleep(std::time::Duration::from_millis(50));
1381
1382        // Send array — should be silently dropped by sender
1383        handle.array_sender().send(make_test_array(99));
1384
1385        // Verify nothing received (with timeout)
1386        let rt = tokio::runtime::Builder::new_current_thread()
1387            .enable_all()
1388            .build()
1389            .unwrap();
1390        let result = rt.block_on(async {
1391            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1392        });
1393        assert!(
1394            result.is_err(),
1395            "should not receive array when callbacks disabled"
1396        );
1397    }
1398
1399    #[test]
1400    fn test_blocking_downstream_receives() {
1401        let pool = Arc::new(NDArrayPool::new(1_000_000));
1402
1403        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1404        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1405        let mut output = NDArrayOutput::new();
1406        output.add(ds1);
1407        output.add(ds2);
1408
1409        let (handle, _data_jh) = create_plugin_runtime_with_output(
1410            "BLOCK_DS_TEST",
1411            PassthroughProcessor,
1412            pool,
1413            10,
1414            output,
1415            "",
1416            test_wiring(),
1417        );
1418        enable_callbacks(&handle);
1419
1420        // Enable blocking mode
1421        handle
1422            .port_runtime()
1423            .port_handle()
1424            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1425            .unwrap();
1426        std::thread::sleep(std::time::Duration::from_millis(50));
1427
1428        handle.array_sender().send(make_test_array(77));
1429
1430        // Both downstream receivers should have the array
1431        let r1 = rx1.blocking_recv().unwrap();
1432        let r2 = rx2.blocking_recv().unwrap();
1433        assert_eq!(r1.unique_id, 77);
1434        assert_eq!(r2.unique_id, 77);
1435    }
1436
1437    #[test]
1438    fn test_blocking_param_updates() {
1439        let pool = Arc::new(NDArrayPool::new(1_000_000));
1440
1441        struct ParamTracker;
1442        impl NDPluginProcess for ParamTracker {
1443            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1444                ProcessResult::arrays(vec![Arc::new(array.clone())])
1445            }
1446            fn plugin_type(&self) -> &str {
1447                "ParamTracker"
1448            }
1449        }
1450
1451        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1452        let mut output = NDArrayOutput::new();
1453        output.add(downstream_sender);
1454
1455        let (handle, _data_jh) = create_plugin_runtime_with_output(
1456            "PARAM_TEST",
1457            ParamTracker,
1458            pool,
1459            10,
1460            output,
1461            "",
1462            test_wiring(),
1463        );
1464        enable_callbacks(&handle);
1465
1466        // Enable blocking mode
1467        handle
1468            .port_runtime()
1469            .port_handle()
1470            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1471            .unwrap();
1472        std::thread::sleep(std::time::Duration::from_millis(50));
1473
1474        // Send array in blocking mode
1475        handle.array_sender().send(make_test_array(1));
1476        let received = downstream_rx.blocking_recv().unwrap();
1477        assert_eq!(received.unique_id, 1);
1478
1479        // Write enable_callbacks while in blocking mode — should not crash
1480        handle
1481            .port_runtime()
1482            .port_handle()
1483            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1484            .unwrap();
1485        std::thread::sleep(std::time::Duration::from_millis(50));
1486
1487        // Still works after param update
1488        handle.array_sender().send(make_test_array(2));
1489        let received = downstream_rx.blocking_recv().unwrap();
1490        assert_eq!(received.unique_id, 2);
1491    }
1492
1493    /// Phase 0 regression test: process_and_publish inside a current-thread runtime must not panic.
1494    #[test]
1495    fn test_no_panic_in_current_thread_runtime() {
1496        let pool = Arc::new(NDArrayPool::new(1_000_000));
1497        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1498        let mut output = NDArrayOutput::new();
1499        output.add(downstream_sender);
1500
1501        let (handle, _data_jh) = create_plugin_runtime_with_output(
1502            "CURRENT_THREAD_TEST",
1503            PassthroughProcessor,
1504            pool,
1505            10,
1506            output,
1507            "",
1508            test_wiring(),
1509        );
1510        enable_callbacks(&handle);
1511
1512        // Enable blocking mode so process_and_publish runs inline
1513        handle
1514            .port_runtime()
1515            .port_handle()
1516            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1517            .unwrap();
1518        std::thread::sleep(std::time::Duration::from_millis(50));
1519
1520        // Call send (which calls process_and_publish inline) from inside a current-thread runtime
1521        let rt = tokio::runtime::Builder::new_current_thread()
1522            .enable_all()
1523            .build()
1524            .unwrap();
1525        rt.block_on(async {
1526            handle.array_sender().send(make_test_array(99));
1527        });
1528
1529        let received = downstream_rx.blocking_recv().unwrap();
1530        assert_eq!(received.unique_id, 99);
1531    }
1532}