Skip to main content

ad_core_rs/plugin/
runtime.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{
19    BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
20};
21use super::params::PluginBaseParams;
22use super::wiring::WiringRegistry;
23
24/// Value sent through the param change channel from control plane to data plane.
25#[derive(Debug, Clone)]
26pub enum ParamChangeValue {
27    Int32(i32),
28    Float64(f64),
29    Octet(String),
30}
31
32impl ParamChangeValue {
33    pub fn as_i32(&self) -> i32 {
34        match self {
35            ParamChangeValue::Int32(v) => *v,
36            ParamChangeValue::Float64(v) => *v as i32,
37            ParamChangeValue::Octet(_) => 0,
38        }
39    }
40
41    pub fn as_f64(&self) -> f64 {
42        match self {
43            ParamChangeValue::Int32(v) => *v as f64,
44            ParamChangeValue::Float64(v) => *v,
45            ParamChangeValue::Octet(_) => 0.0,
46        }
47    }
48
49    pub fn as_string(&self) -> Option<&str> {
50        match self {
51            ParamChangeValue::Octet(s) => Some(s),
52            _ => None,
53        }
54    }
55}
56
57/// A single parameter update produced by a plugin's process_array.
58pub enum ParamUpdate {
59    Int32 {
60        reason: usize,
61        addr: i32,
62        value: i32,
63    },
64    Float64 {
65        reason: usize,
66        addr: i32,
67        value: f64,
68    },
69    Octet {
70        reason: usize,
71        addr: i32,
72        value: String,
73    },
74    Float64Array {
75        reason: usize,
76        addr: i32,
77        value: Vec<f64>,
78    },
79}
80
81impl ParamUpdate {
82    /// Create an Int32 update at addr 0.
83    pub fn int32(reason: usize, value: i32) -> Self {
84        Self::Int32 {
85            reason,
86            addr: 0,
87            value,
88        }
89    }
90    /// Create a Float64 update at addr 0.
91    pub fn float64(reason: usize, value: f64) -> Self {
92        Self::Float64 {
93            reason,
94            addr: 0,
95            value,
96        }
97    }
98    /// Create an Int32 update at a specific addr.
99    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
100        Self::Int32 {
101            reason,
102            addr,
103            value,
104        }
105    }
106    /// Create a Float64 update at a specific addr.
107    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
108        Self::Float64 {
109            reason,
110            addr,
111            value,
112        }
113    }
114    /// Create a Float64Array update at addr 0.
115    pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
116        Self::Float64Array {
117            reason,
118            addr: 0,
119            value,
120        }
121    }
122    /// Create a Float64Array update at a specific addr.
123    pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
124        Self::Float64Array {
125            reason,
126            addr,
127            value,
128        }
129    }
130}
131
132/// Result of processing one array: output arrays + param updates to write back.
133pub struct ProcessResult {
134    pub output_arrays: Vec<Arc<NDArray>>,
135    pub param_updates: Vec<ParamUpdate>,
136    /// If set, only publish to the subscriber at this index (round-robin scatter).
137    pub scatter_index: Option<usize>,
138}
139
140impl ProcessResult {
141    /// Convenience: sink plugin with only param updates, no output arrays.
142    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
143        Self {
144            output_arrays: vec![],
145            param_updates,
146            scatter_index: None,
147        }
148    }
149
150    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
151    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
152        Self {
153            output_arrays,
154            param_updates: vec![],
155            scatter_index: None,
156        }
157    }
158
159    /// Convenience: no outputs, no param updates.
160    pub fn empty() -> Self {
161        Self {
162            output_arrays: vec![],
163            param_updates: vec![],
164            scatter_index: None,
165        }
166    }
167
168    /// Convenience: scatter output — send to a single subscriber by index.
169    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
170        Self {
171            output_arrays,
172            param_updates: vec![],
173            scatter_index: Some(index),
174        }
175    }
176}
177
178/// Result of handling a control-plane param change.
179pub struct ParamChangeResult {
180    pub output_arrays: Vec<Arc<NDArray>>,
181    pub param_updates: Vec<ParamUpdate>,
182}
183
184impl ParamChangeResult {
185    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
186        Self {
187            output_arrays: vec![],
188            param_updates,
189        }
190    }
191
192    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
193        Self {
194            output_arrays,
195            param_updates: vec![],
196        }
197    }
198
199    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
200        Self {
201            output_arrays,
202            param_updates,
203        }
204    }
205
206    pub fn empty() -> Self {
207        Self {
208            output_arrays: vec![],
209            param_updates: vec![],
210        }
211    }
212}
213
214/// Pure processing logic. No threading concerns.
215pub trait NDPluginProcess: Send + 'static {
216    /// Process one array. Return output arrays and param updates.
217    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
218
219    /// Plugin type name for PLUGIN_TYPE param.
220    fn plugin_type(&self) -> &str;
221
222    /// Register plugin-specific params on the base. Called once during construction.
223    fn register_params(
224        &mut self,
225        _base: &mut PortDriverBase,
226    ) -> Result<(), asyn_rs::error::AsynError> {
227        Ok(())
228    }
229
230    /// Called when a param changes. Reason is the param index.
231    /// Return param updates to be written back to the port driver.
232    fn on_param_change(
233        &mut self,
234        _reason: usize,
235        _params: &PluginParamSnapshot,
236    ) -> ParamChangeResult {
237        ParamChangeResult::empty()
238    }
239
240    /// Return a handle to the latest NDArray data for array reads.
241    /// Override this in plugins like NDPluginStdArrays that serve pixel data
242    /// via readInt8Array/readInt16Array/etc.
243    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
244        None
245    }
246}
247
248/// Read-only snapshot of param values available to the processing thread.
249pub struct PluginParamSnapshot {
250    pub enable_callbacks: bool,
251    /// The param reason that changed.
252    pub reason: usize,
253    /// The address (sub-device) that changed.
254    pub addr: i32,
255    /// The new value.
256    pub value: ParamChangeValue,
257}
258
259/// Sort buffer for reordering output arrays by uniqueId.
260///
261/// When sort_mode is enabled, output arrays are inserted into a BTreeMap
262/// keyed by uniqueId instead of being sent directly. A periodic flush task
263/// drains arrays in uniqueId order.
264struct SortBuffer {
265    /// Buffered arrays keyed by uniqueId, ordered by BTreeMap.
266    entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
267    /// The last uniqueId that was emitted (for detecting disordered arrays).
268    last_emitted_id: i32,
269    /// Counter of arrays received out of order (uniqueId < last_emitted_id).
270    disordered_arrays: i32,
271    /// Counter of arrays dropped because the buffer was full.
272    dropped_output_arrays: i32,
273}
274
275impl SortBuffer {
276    fn new() -> Self {
277        Self {
278            entries: BTreeMap::new(),
279            last_emitted_id: 0,
280            disordered_arrays: 0,
281            dropped_output_arrays: 0,
282        }
283    }
284
285    /// Insert arrays into the sort buffer. If buffer exceeds sort_size, drop oldest entries.
286    fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
287        if unique_id < self.last_emitted_id {
288            self.disordered_arrays += 1;
289        }
290        self.entries.entry(unique_id).or_default().extend(arrays);
291
292        // Enforce sort_size limit by dropping oldest entries
293        while sort_size > 0 && self.entries.len() as i32 > sort_size {
294            if let Some((&oldest_key, _)) = self.entries.iter().next() {
295                self.entries.remove(&oldest_key);
296                self.dropped_output_arrays += 1;
297            }
298        }
299    }
300
301    /// Drain all buffered arrays in uniqueId order. Returns them as (uniqueId, arrays) pairs.
302    fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
303        let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
304        if let Some(&(last_id, _)) = entries.last() {
305            self.last_emitted_id = last_id;
306        }
307        entries
308    }
309
310    /// Number of uniqueId entries currently buffered.
311    fn len(&self) -> i32 {
312        self.entries.len() as i32
313    }
314}
315
316/// Shared processor state protected by a mutex, accessible from both
317/// the data thread (non-blocking mode) and the caller thread (blocking mode).
318struct SharedProcessorInner<P: NDPluginProcess> {
319    processor: P,
320    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
321    pool: Arc<NDArrayPool>,
322    ndarray_params: NDArrayDriverParams,
323    plugin_params: PluginBaseParams,
324    port_handle: PortHandle,
325    array_counter: i32,
326    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
327    std_array_data_param: Option<usize>,
328    /// MinCallbackTime throttling: minimum seconds between process calls.
329    min_callback_time: f64,
330    /// Last time process_and_publish was called (for throttling).
331    last_process_time: Option<std::time::Instant>,
332    /// Sort mode: 0 = disabled, 1 = sorted output.
333    sort_mode: i32,
334    /// Sort time: seconds between periodic flushes of the sort buffer.
335    sort_time: f64,
336    /// Sort size: maximum number of uniqueId entries in the sort buffer.
337    sort_size: i32,
338    /// Sort buffer for reordering output arrays by uniqueId.
339    sort_buffer: SortBuffer,
340    /// Rate tracking: counter value at last rate calculation.
341    rate_last_counter: i32,
342    /// Rate tracking: time of last rate calculation.
343    rate_last_time: std::time::Instant,
344}
345
346impl<P: NDPluginProcess> SharedProcessorInner<P> {
347    fn should_throttle(&self) -> bool {
348        if self.min_callback_time <= 0.0 {
349            return false;
350        }
351        if let Some(last) = self.last_process_time {
352            last.elapsed().as_secs_f64() < self.min_callback_time
353        } else {
354            false
355        }
356    }
357
358    fn process_and_publish(&mut self, array: &NDArray) {
359        if self.should_throttle() {
360            return;
361        }
362        let t0 = std::time::Instant::now();
363        let result = self.processor.process_array(array, &self.pool);
364        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
365        self.last_process_time = Some(t0);
366
367        if self.sort_mode != 0 && !result.output_arrays.is_empty() {
368            // Insert into sort buffer instead of publishing directly
369            let unique_id = array.unique_id;
370            self.sort_buffer
371                .insert(unique_id, result.output_arrays, self.sort_size);
372            // Update sort stats params
373            self.update_sort_params();
374            // Still publish param updates immediately
375            if !result.param_updates.is_empty() {
376                self.publish_result(
377                    vec![],
378                    result.param_updates,
379                    result.scatter_index,
380                    Some(array),
381                    elapsed_ms,
382                );
383            }
384        } else {
385            self.publish_result(
386                result.output_arrays,
387                result.param_updates,
388                result.scatter_index,
389                Some(array),
390                elapsed_ms,
391            );
392        }
393    }
394
395    /// Flush the sort buffer: drain all arrays in uniqueId order and publish them.
396    fn flush_sort_buffer(&mut self) {
397        let entries = self.sort_buffer.drain_all();
398        for (_unique_id, arrays) in entries {
399            self.publish_result(arrays, vec![], None, None, 0.0);
400        }
401        self.update_sort_params();
402    }
403
404    /// Update sort-related param values (SortFree, DisorderedArrays, DroppedOutputArrays).
405    fn update_sort_params(&self) {
406        let sort_free = self.sort_size - self.sort_buffer.len();
407        self.port_handle
408            .write_int32_no_wait(self.plugin_params.sort_free, 0, sort_free);
409        self.port_handle.write_int32_no_wait(
410            self.plugin_params.disordered_arrays,
411            0,
412            self.sort_buffer.disordered_arrays,
413        );
414        self.port_handle.write_int32_no_wait(
415            self.plugin_params.dropped_output_arrays,
416            0,
417            self.sort_buffer.dropped_output_arrays,
418        );
419    }
420
421    fn publish_result(
422        &mut self,
423        output_arrays: Vec<Arc<NDArray>>,
424        param_updates: Vec<ParamUpdate>,
425        scatter_index: Option<usize>,
426        fallback_array: Option<&NDArray>,
427        elapsed_ms: f64,
428    ) {
429        let output = self.output.lock();
430        for out in &output_arrays {
431            if let Some(idx) = scatter_index {
432                output.publish_to(idx, out.clone());
433            } else {
434                output.publish(out.clone());
435            }
436        }
437        drop(output);
438
439        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
440            self.array_counter += 1;
441
442            // Fire array data interrupt directly (C EPICS pattern).
443            // Bypasses port actor channel to avoid dropping large array messages.
444            if let Some(param) = self.std_array_data_param {
445                use crate::ndarray::NDDataBuffer;
446                use asyn_rs::param::ParamValue;
447                let value = match &report_arr.data {
448                    NDDataBuffer::I8(v) => {
449                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
450                    }
451                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
452                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
453                    ))),
454                    NDDataBuffer::I16(v) => {
455                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
456                    }
457                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
458                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
459                    ))),
460                    NDDataBuffer::I32(v) => {
461                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
462                    }
463                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
464                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
465                    ))),
466                    NDDataBuffer::I64(v) => {
467                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
468                    }
469                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
470                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
471                    ))),
472                    NDDataBuffer::F32(v) => {
473                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
474                    }
475                    NDDataBuffer::F64(v) => {
476                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
477                    }
478                };
479                if let Some(value) = value {
480                    let ts = report_arr.timestamp.to_system_time();
481                    self.port_handle
482                        .interrupts()
483                        .notify(asyn_rs::interrupt::InterruptValue {
484                            reason: param,
485                            addr: 0,
486                            value,
487                            timestamp: ts,
488                            uint32_changed_mask: 0,
489                        });
490                }
491            }
492
493            let info = report_arr.info();
494            let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
495            self.port_handle.write_int32_no_wait(
496                self.ndarray_params.array_counter,
497                0,
498                self.array_counter,
499            );
500            self.port_handle.write_int32_no_wait(
501                self.ndarray_params.unique_id,
502                0,
503                report_arr.unique_id,
504            );
505            self.port_handle.write_int32_no_wait(
506                self.ndarray_params.n_dimensions,
507                0,
508                report_arr.dims.len() as i32,
509            );
510            self.port_handle.write_int32_no_wait(
511                self.ndarray_params.array_size_x,
512                0,
513                info.x_size as i32,
514            );
515            self.port_handle.write_int32_no_wait(
516                self.ndarray_params.array_size_y,
517                0,
518                info.y_size as i32,
519            );
520            self.port_handle.write_int32_no_wait(
521                self.ndarray_params.array_size_z,
522                0,
523                info.color_size as i32,
524            );
525            self.port_handle.write_int32_no_wait(
526                self.ndarray_params.array_size,
527                0,
528                info.total_bytes as i32,
529            );
530            self.port_handle.write_int32_no_wait(
531                self.ndarray_params.data_type,
532                0,
533                report_arr.data.data_type() as i32,
534            );
535            self.port_handle
536                .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
537
538            let ts_f64 = report_arr.timestamp.as_f64();
539            self.port_handle
540                .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
541            self.port_handle.write_int32_no_wait(
542                self.ndarray_params.epics_ts_sec,
543                0,
544                report_arr.timestamp.sec as i32,
545            );
546            self.port_handle.write_int32_no_wait(
547                self.ndarray_params.epics_ts_nsec,
548                0,
549                report_arr.timestamp.nsec as i32,
550            );
551        }
552
553        self.port_handle
554            .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
555
556        // Compute array rate (Hz) based on counter delta over elapsed time.
557        let now = std::time::Instant::now();
558        let dt = now.duration_since(self.rate_last_time).as_secs_f64();
559        if dt >= 1.0 {
560            let delta = self.array_counter - self.rate_last_counter;
561            let rate = if delta > 0 { delta as f64 / dt } else { 0.0 };
562            self.rate_last_counter = self.array_counter;
563            self.rate_last_time = now;
564            self.port_handle
565                .write_float64_no_wait(self.ndarray_params.array_rate, 0, rate);
566        }
567
568        // Set params directly and fire callbacks — no writeInt32/on_param_change re-entrancy.
569        // This mirrors C ADCore's setIntegerParam + callParamCallbacks pattern.
570        use asyn_rs::request::ParamSetValue;
571
572        let mut addr0_updates: Vec<ParamSetValue> = Vec::new();
573        let mut extra_addr_map: std::collections::HashMap<i32, Vec<ParamSetValue>> =
574            std::collections::HashMap::new();
575
576        for update in &param_updates {
577            match update {
578                ParamUpdate::Int32 {
579                    reason,
580                    addr,
581                    value,
582                } => {
583                    let pv = ParamSetValue::Int32 {
584                        reason: *reason,
585                        addr: *addr,
586                        value: *value,
587                    };
588                    if *addr == 0 {
589                        addr0_updates.push(pv);
590                    } else {
591                        extra_addr_map.entry(*addr).or_default().push(pv);
592                    }
593                }
594                ParamUpdate::Float64 {
595                    reason,
596                    addr,
597                    value,
598                } => {
599                    let pv = ParamSetValue::Float64 {
600                        reason: *reason,
601                        addr: *addr,
602                        value: *value,
603                    };
604                    if *addr == 0 {
605                        addr0_updates.push(pv);
606                    } else {
607                        extra_addr_map.entry(*addr).or_default().push(pv);
608                    }
609                }
610                ParamUpdate::Octet {
611                    reason,
612                    addr,
613                    value,
614                } => {
615                    let pv = ParamSetValue::Octet {
616                        reason: *reason,
617                        addr: *addr,
618                        value: value.clone(),
619                    };
620                    if *addr == 0 {
621                        addr0_updates.push(pv);
622                    } else {
623                        extra_addr_map.entry(*addr).or_default().push(pv);
624                    }
625                }
626                ParamUpdate::Float64Array {
627                    reason,
628                    addr,
629                    value,
630                } => {
631                    let pv = ParamSetValue::Float64Array {
632                        reason: *reason,
633                        addr: *addr,
634                        value: value.clone(),
635                    };
636                    if *addr == 0 {
637                        addr0_updates.push(pv);
638                    } else {
639                        extra_addr_map.entry(*addr).or_default().push(pv);
640                    }
641                }
642            }
643        }
644
645        self.port_handle.set_params_and_notify(0, addr0_updates);
646        for (addr, updates) in extra_addr_map {
647            self.port_handle.set_params_and_notify(addr, updates);
648        }
649    }
650}
651
652/// Type-erased handle for blocking mode: allows NDArraySender to call
653/// process_and_publish without knowing the concrete processor type.
654struct BlockingProcessorHandle<P: NDPluginProcess> {
655    inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
656}
657
658impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
659    fn process_and_publish(&self, array: &NDArray) {
660        self.inner.lock().process_and_publish(array);
661    }
662}
663
664/// PortDriver implementation for a plugin's control plane.
665#[allow(dead_code)]
666pub struct PluginPortDriver {
667    base: PortDriverBase,
668    ndarray_params: NDArrayDriverParams,
669    plugin_params: PluginBaseParams,
670    param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
671    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
672    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
673    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
674    std_array_data_param: Option<usize>,
675}
676
677impl PluginPortDriver {
678    fn new<P: NDPluginProcess>(
679        port_name: &str,
680        plugin_type_name: &str,
681        queue_size: usize,
682        ndarray_port: &str,
683        max_addr: usize,
684        param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
685        processor: &mut P,
686        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
687    ) -> AsynResult<Self> {
688        let mut base = PortDriverBase::new(
689            port_name,
690            max_addr,
691            PortFlags {
692                can_block: true,
693                ..Default::default()
694            },
695        );
696
697        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
698        let plugin_params = PluginBaseParams::create(&mut base)?;
699
700        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
701        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
702        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
703        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
704        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
705        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
706        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
707        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
708        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
709        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
710        base.set_int32_param(ndarray_params.capture, 0, 0)?;
711        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
712        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
713        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
714        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
715        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
716        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
717        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
718        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
719        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
720        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
721
722        // Set plugin identity params
723        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
724        base.set_string_param(
725            ndarray_params.ad_core_version,
726            0,
727            env!("CARGO_PKG_VERSION").into(),
728        )?;
729        base.set_string_param(
730            ndarray_params.driver_version,
731            0,
732            env!("CARGO_PKG_VERSION").into(),
733        )?;
734        if !ndarray_port.is_empty() {
735            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
736        }
737
738        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
739        let std_array_data_param = if array_data.is_some() {
740            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
741        } else {
742            None
743        };
744
745        // Let the processor register its plugin-specific params
746        processor.register_params(&mut base)?;
747
748        Ok(Self {
749            base,
750            ndarray_params,
751            plugin_params,
752            param_change_tx,
753            array_data,
754            std_array_data_param,
755        })
756    }
757}
758
759/// Copy source slice directly into destination buffer, returning elements copied.
760fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
761    let n = src.len().min(dst.len());
762    dst[..n].copy_from_slice(&src[..n]);
763    n
764}
765
766/// Convert and copy source slice into destination buffer element-by-element.
767fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
768where
769    S: CastToF64 + Copy,
770    D: CastFromF64 + Copy,
771{
772    let n = src.len().min(dst.len());
773    for i in 0..n {
774        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
775    }
776    n
777}
778
779/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
780trait CastToF64 {
781    fn cast_to_f64(self) -> f64;
782}
783
784impl CastToF64 for i8 {
785    fn cast_to_f64(self) -> f64 {
786        self as f64
787    }
788}
789impl CastToF64 for u8 {
790    fn cast_to_f64(self) -> f64 {
791        self as f64
792    }
793}
794impl CastToF64 for i16 {
795    fn cast_to_f64(self) -> f64 {
796        self as f64
797    }
798}
799impl CastToF64 for u16 {
800    fn cast_to_f64(self) -> f64 {
801        self as f64
802    }
803}
804impl CastToF64 for i32 {
805    fn cast_to_f64(self) -> f64 {
806        self as f64
807    }
808}
809impl CastToF64 for u32 {
810    fn cast_to_f64(self) -> f64 {
811        self as f64
812    }
813}
814impl CastToF64 for i64 {
815    fn cast_to_f64(self) -> f64 {
816        self as f64
817    }
818}
819impl CastToF64 for u64 {
820    fn cast_to_f64(self) -> f64 {
821        self as f64
822    }
823}
824impl CastToF64 for f32 {
825    fn cast_to_f64(self) -> f64 {
826        self as f64
827    }
828}
829impl CastToF64 for f64 {
830    fn cast_to_f64(self) -> f64 {
831        self
832    }
833}
834
835/// Helper trait for `as` casts from f64.
836trait CastFromF64 {
837    fn cast_from_f64(v: f64) -> Self;
838}
839
840impl CastFromF64 for i8 {
841    fn cast_from_f64(v: f64) -> Self {
842        v as i8
843    }
844}
845impl CastFromF64 for i16 {
846    fn cast_from_f64(v: f64) -> Self {
847        v as i16
848    }
849}
850impl CastFromF64 for i32 {
851    fn cast_from_f64(v: f64) -> Self {
852        v as i32
853    }
854}
855impl CastFromF64 for f32 {
856    fn cast_from_f64(v: f64) -> Self {
857        v as f32
858    }
859}
860impl CastFromF64 for f64 {
861    fn cast_from_f64(v: f64) -> Self {
862        v
863    }
864}
865
866/// Copy NDArray data into the output buffer with type conversion.
867/// Returns the number of elements copied, or 0 if no data is available.
868macro_rules! impl_read_array {
869    ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
870        use crate::ndarray::NDDataBuffer;
871        let handle = match &$self.array_data {
872            Some(h) => h,
873            None => return Ok(0),
874        };
875        let guard = handle.lock();
876        let array = match &*guard {
877            Some(a) => a,
878            None => return Ok(0),
879        };
880        let n = match &array.data {
881            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
882            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
883        };
884        Ok(n)
885    }};
886}
887
888impl PortDriver for PluginPortDriver {
889    fn base(&self) -> &PortDriverBase {
890        &self.base
891    }
892
893    fn base_mut(&mut self) -> &mut PortDriverBase {
894        &mut self.base
895    }
896
897    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
898        let reason = user.reason;
899        let addr = user.addr;
900        self.base.set_int32_param(reason, addr, value)?;
901        self.base.call_param_callbacks(addr)?;
902        let _ = self
903            .param_change_tx
904            .try_send((reason, addr, ParamChangeValue::Int32(value)));
905        Ok(())
906    }
907
908    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
909        let reason = user.reason;
910        let addr = user.addr;
911        self.base.set_float64_param(reason, addr, value)?;
912        self.base.call_param_callbacks(addr)?;
913        let _ = self
914            .param_change_tx
915            .try_send((reason, addr, ParamChangeValue::Float64(value)));
916        Ok(())
917    }
918
919    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
920        let reason = user.reason;
921        let addr = user.addr;
922        let s = String::from_utf8_lossy(data).into_owned();
923        self.base.set_string_param(reason, addr, s.clone())?;
924        self.base.call_param_callbacks(addr)?;
925        let _ = self
926            .param_change_tx
927            .try_send((reason, addr, ParamChangeValue::Octet(s)));
928        Ok(())
929    }
930
931    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
932        impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
933    }
934
935    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
936        impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
937    }
938
939    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
940        impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
941    }
942
943    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
944        impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
945    }
946
947    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
948        impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
949    }
950}
951
952/// Handle to a running plugin runtime. Provides access to sender and port handle.
953#[derive(Clone)]
954pub struct PluginRuntimeHandle {
955    port_runtime: PortRuntimeHandle,
956    array_sender: NDArraySender,
957    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
958    port_name: String,
959    pub ndarray_params: NDArrayDriverParams,
960    pub plugin_params: PluginBaseParams,
961}
962
963impl PluginRuntimeHandle {
964    pub fn port_runtime(&self) -> &PortRuntimeHandle {
965        &self.port_runtime
966    }
967
968    pub fn array_sender(&self) -> &NDArraySender {
969        &self.array_sender
970    }
971
972    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
973        &self.array_output
974    }
975
976    pub fn port_name(&self) -> &str {
977        &self.port_name
978    }
979}
980
981/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
982///
983/// Returns:
984/// - `PluginRuntimeHandle` for wiring and control
985/// - `PortRuntimeHandle` for param I/O
986/// - `JoinHandle` for the data processing thread
987pub fn create_plugin_runtime<P: NDPluginProcess>(
988    port_name: &str,
989    processor: P,
990    pool: Arc<NDArrayPool>,
991    queue_size: usize,
992    ndarray_port: &str,
993    wiring: Arc<WiringRegistry>,
994) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
995    create_plugin_runtime_multi_addr(
996        port_name,
997        processor,
998        pool,
999        queue_size,
1000        ndarray_port,
1001        wiring,
1002        1,
1003    )
1004}
1005
1006/// Create a plugin runtime with multi-addr support.
1007///
1008/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
1009pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1010    port_name: &str,
1011    mut processor: P,
1012    pool: Arc<NDArrayPool>,
1013    queue_size: usize,
1014    ndarray_port: &str,
1015    wiring: Arc<WiringRegistry>,
1016    max_addr: usize,
1017) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1018    // Param change channel (control plane -> data plane)
1019    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1020
1021    // Capture plugin type and array data handle before mutable borrow
1022    let plugin_type_name = processor.plugin_type().to_string();
1023    let array_data = processor.array_data_handle();
1024
1025    // Create the port driver for control plane
1026    let driver = PluginPortDriver::new(
1027        port_name,
1028        &plugin_type_name,
1029        queue_size,
1030        ndarray_port,
1031        max_addr,
1032        param_tx,
1033        &mut processor,
1034        array_data,
1035    )
1036    .expect("failed to create plugin port driver");
1037
1038    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1039    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1040    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1041    let sort_mode_reason = driver.plugin_params.sort_mode;
1042    let sort_time_reason = driver.plugin_params.sort_time;
1043    let sort_size_reason = driver.plugin_params.sort_size;
1044    let ndarray_params = driver.ndarray_params;
1045    let plugin_params = driver.plugin_params;
1046    let std_array_data_param = driver.std_array_data_param;
1047
1048    // Create port runtime (actor thread for param I/O)
1049    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1050
1051    // Clone port handle for the data thread to write params back
1052    let port_handle = port_runtime.port_handle().clone();
1053
1054    // Array channel (data plane)
1055    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1056
1057    // Shared mode flags
1058    let enabled = Arc::new(AtomicBool::new(false));
1059    let blocking_mode = Arc::new(AtomicBool::new(false));
1060
1061    // Shared processor (accessible from both data thread and caller thread)
1062    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1063    let array_output_for_handle = array_output.clone();
1064    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1065        processor,
1066        output: array_output,
1067        pool,
1068        ndarray_params,
1069        plugin_params,
1070        port_handle,
1071        array_counter: 0,
1072        std_array_data_param,
1073        min_callback_time: 0.0,
1074        last_process_time: None,
1075        sort_mode: 0,
1076        sort_time: 0.0,
1077        sort_size: 10,
1078        sort_buffer: SortBuffer::new(),
1079        rate_last_counter: 0,
1080        rate_last_time: std::time::Instant::now(),
1081    }));
1082
1083    // Type-erased handle for blocking mode
1084    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1085        inner: shared.clone(),
1086    });
1087
1088    let data_enabled = enabled.clone();
1089    let data_blocking = blocking_mode.clone();
1090
1091    // Capture queue metrics before with_blocking_support consumes the sender
1092    let dropped_count = array_sender.dropped_count_shared();
1093    let queue_tx = array_sender.tx_clone();
1094
1095    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1096
1097    // Capture wiring info for data loop
1098    let nd_array_port_reason = plugin_params.nd_array_port;
1099    let sender_port_name = port_name.to_string();
1100    let initial_upstream = ndarray_port.to_string();
1101
1102    // Spawn data processing thread
1103    let data_jh = thread::Builder::new()
1104        .name(format!("plugin-data-{port_name}"))
1105        .spawn(move || {
1106            plugin_data_loop(
1107                shared,
1108                array_rx,
1109                param_rx,
1110                enable_callbacks_reason,
1111                blocking_callbacks_reason,
1112                min_callback_time_reason,
1113                sort_mode_reason,
1114                sort_time_reason,
1115                sort_size_reason,
1116                data_enabled,
1117                data_blocking,
1118                nd_array_port_reason,
1119                sender_port_name,
1120                initial_upstream,
1121                wiring,
1122                dropped_count,
1123                queue_tx,
1124            );
1125        })
1126        .expect("failed to spawn plugin data thread");
1127
1128    let handle = PluginRuntimeHandle {
1129        port_runtime,
1130        array_sender,
1131        array_output: array_output_for_handle,
1132        port_name: port_name.to_string(),
1133        ndarray_params,
1134        plugin_params,
1135    };
1136
1137    (handle, data_jh)
1138}
1139
1140fn plugin_data_loop<P: NDPluginProcess>(
1141    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1142    mut array_rx: NDArrayReceiver,
1143    mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1144    enable_callbacks_reason: usize,
1145    blocking_callbacks_reason: usize,
1146    min_callback_time_reason: usize,
1147    sort_mode_reason: usize,
1148    sort_time_reason: usize,
1149    sort_size_reason: usize,
1150    enabled: Arc<AtomicBool>,
1151    blocking_mode: Arc<AtomicBool>,
1152    nd_array_port_reason: usize,
1153    sender_port_name: String,
1154    initial_upstream: String,
1155    wiring: Arc<WiringRegistry>,
1156    dropped_count: Arc<std::sync::atomic::AtomicU64>,
1157    queue_tx: tokio::sync::mpsc::Sender<super::channel::ArrayMessage>,
1158) {
1159    let mut current_upstream = initial_upstream;
1160    let rt = tokio::runtime::Builder::new_current_thread()
1161        .enable_all()
1162        .build()
1163        .unwrap();
1164    rt.block_on(async {
1165        // Sort flush timer — starts disabled (very long interval).
1166        // Re-created when sort_time changes.
1167        let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1168        let mut sort_flush_active = false;
1169
1170        loop {
1171            tokio::select! {
1172                msg = array_rx.recv_msg() => {
1173                    match msg {
1174                        Some(msg) => {
1175                            // In blocking mode, arrays are processed inline by the caller.
1176                            // Skip processing here to avoid double-processing.
1177                            if !blocking_mode.load(Ordering::Acquire) {
1178                                shared.lock().process_and_publish(&msg.array);
1179                            }
1180                            // msg dropped here → completion signaled (if tracked)
1181
1182                            // Update queue metrics (C parity: DroppedArrays + QueueFree)
1183                            let guard = shared.lock();
1184                            let queue_free = queue_tx.capacity() as i32;
1185                            let dropped = dropped_count.load(Ordering::Relaxed) as i32;
1186                            guard.port_handle.write_int32_no_wait(
1187                                guard.plugin_params.queue_use, 0, queue_free,
1188                            );
1189                            guard.port_handle.write_int32_no_wait(
1190                                guard.plugin_params.dropped_arrays, 0, dropped,
1191                            );
1192                            drop(guard);
1193                        }
1194                        None => break,
1195                    }
1196                }
1197                param = param_rx.recv() => {
1198                    match param {
1199                        Some((reason, addr, value)) => {
1200                            if reason == enable_callbacks_reason {
1201                                enabled.store(value.as_i32() != 0, Ordering::Release);
1202                            }
1203                            if reason == blocking_callbacks_reason {
1204                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1205                            }
1206                            // Handle MinCallbackTime param change
1207                            if reason == min_callback_time_reason {
1208                                shared.lock().min_callback_time = value.as_f64();
1209                            }
1210                            // Handle sort param changes
1211                            if reason == sort_mode_reason {
1212                                let mode = value.as_i32();
1213                                let mut guard = shared.lock();
1214                                guard.sort_mode = mode;
1215                                if mode == 0 {
1216                                    // Flush remaining buffered arrays when disabling sort mode
1217                                    guard.flush_sort_buffer();
1218                                    sort_flush_active = false;
1219                                } else {
1220                                    // Activate flush timer if sort_time > 0
1221                                    sort_flush_active = guard.sort_time > 0.0;
1222                                    if sort_flush_active {
1223                                        let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1224                                        sort_flush_interval = tokio::time::interval(dur);
1225                                    }
1226                                }
1227                                drop(guard);
1228                            }
1229                            if reason == sort_time_reason {
1230                                let t = value.as_f64();
1231                                let mut guard = shared.lock();
1232                                guard.sort_time = t;
1233                                if guard.sort_mode != 0 && t > 0.0 {
1234                                    sort_flush_active = true;
1235                                    let dur = std::time::Duration::from_secs_f64(t);
1236                                    sort_flush_interval = tokio::time::interval(dur);
1237                                } else {
1238                                    sort_flush_active = false;
1239                                }
1240                                drop(guard);
1241                            }
1242                            if reason == sort_size_reason {
1243                                shared.lock().sort_size = value.as_i32();
1244                            }
1245                            // Handle NDArrayPort rewiring
1246                            if reason == nd_array_port_reason {
1247                                if let Some(new_port) = value.as_string() {
1248                                    if new_port != current_upstream {
1249                                        let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1250                                        if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1251                                            eprintln!("NDArrayPort rewire failed: {e}");
1252                                            current_upstream = old;
1253                                        }
1254                                    }
1255                                }
1256                            }
1257                            let snapshot = PluginParamSnapshot {
1258                                enable_callbacks: enabled.load(Ordering::Acquire),
1259                                reason,
1260                                addr,
1261                                value,
1262                            };
1263                            let mut guard = shared.lock();
1264                            let t0 = std::time::Instant::now();
1265                            let result = guard.processor.on_param_change(reason, &snapshot);
1266                            let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1267                            if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1268                                guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
1269                            }
1270                            drop(guard);
1271                        }
1272                        None => break,
1273                    }
1274                }
1275                _ = sort_flush_interval.tick(), if sort_flush_active => {
1276                    shared.lock().flush_sort_buffer();
1277                }
1278            }
1279        }
1280    });
1281}
1282
1283/// Connect a downstream plugin's sender to a plugin runtime's output.
1284pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1285    upstream.array_output().lock().add(downstream_sender);
1286}
1287
1288/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
1289pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1290    port_name: &str,
1291    mut processor: P,
1292    pool: Arc<NDArrayPool>,
1293    queue_size: usize,
1294    output: NDArrayOutput,
1295    ndarray_port: &str,
1296    wiring: Arc<WiringRegistry>,
1297) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1298    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1299
1300    let plugin_type_name = processor.plugin_type().to_string();
1301    let array_data = processor.array_data_handle();
1302    let driver = PluginPortDriver::new(
1303        port_name,
1304        &plugin_type_name,
1305        queue_size,
1306        ndarray_port,
1307        1,
1308        param_tx,
1309        &mut processor,
1310        array_data,
1311    )
1312    .expect("failed to create plugin port driver");
1313
1314    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1315    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1316    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1317    let sort_mode_reason = driver.plugin_params.sort_mode;
1318    let sort_time_reason = driver.plugin_params.sort_time;
1319    let sort_size_reason = driver.plugin_params.sort_size;
1320    let ndarray_params = driver.ndarray_params;
1321    let plugin_params = driver.plugin_params;
1322    let std_array_data_param = driver.std_array_data_param;
1323
1324    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1325
1326    let port_handle = port_runtime.port_handle().clone();
1327
1328    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1329
1330    let enabled = Arc::new(AtomicBool::new(false));
1331    let blocking_mode = Arc::new(AtomicBool::new(false));
1332
1333    let array_output = Arc::new(parking_lot::Mutex::new(output));
1334    let array_output_for_handle = array_output.clone();
1335    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1336        processor,
1337        output: array_output,
1338        pool,
1339        ndarray_params,
1340        plugin_params,
1341        port_handle,
1342        array_counter: 0,
1343        std_array_data_param,
1344        min_callback_time: 0.0,
1345        last_process_time: None,
1346        sort_mode: 0,
1347        sort_time: 0.0,
1348        sort_size: 10,
1349        sort_buffer: SortBuffer::new(),
1350        rate_last_counter: 0,
1351        rate_last_time: std::time::Instant::now(),
1352    }));
1353
1354    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1355        inner: shared.clone(),
1356    });
1357
1358    let data_enabled = enabled.clone();
1359    let data_blocking = blocking_mode.clone();
1360
1361    // Capture queue metrics before with_blocking_support consumes the sender
1362    let dropped_count = array_sender.dropped_count_shared();
1363    let queue_tx = array_sender.tx_clone();
1364
1365    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1366
1367    // Capture wiring info for data loop
1368    let nd_array_port_reason = plugin_params.nd_array_port;
1369    let sender_port_name = port_name.to_string();
1370    let initial_upstream = ndarray_port.to_string();
1371
1372    let data_jh = thread::Builder::new()
1373        .name(format!("plugin-data-{port_name}"))
1374        .spawn(move || {
1375            plugin_data_loop(
1376                shared,
1377                array_rx,
1378                param_rx,
1379                enable_callbacks_reason,
1380                blocking_callbacks_reason,
1381                min_callback_time_reason,
1382                sort_mode_reason,
1383                sort_time_reason,
1384                sort_size_reason,
1385                data_enabled,
1386                data_blocking,
1387                nd_array_port_reason,
1388                sender_port_name,
1389                initial_upstream,
1390                wiring,
1391                dropped_count,
1392                queue_tx,
1393            );
1394        })
1395        .expect("failed to spawn plugin data thread");
1396
1397    let handle = PluginRuntimeHandle {
1398        port_runtime,
1399        array_sender,
1400        array_output: array_output_for_handle,
1401        port_name: port_name.to_string(),
1402        ndarray_params,
1403        plugin_params,
1404    };
1405
1406    (handle, data_jh)
1407}
1408
1409#[cfg(test)]
1410mod tests {
1411    use super::*;
1412    use crate::ndarray::{NDDataType, NDDimension};
1413    use crate::plugin::channel::ndarray_channel;
1414
1415    /// Passthrough processor: returns the input array as-is.
1416    struct PassthroughProcessor;
1417
1418    impl NDPluginProcess for PassthroughProcessor {
1419        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1420            ProcessResult::arrays(vec![Arc::new(array.clone())])
1421        }
1422        fn plugin_type(&self) -> &str {
1423            "Passthrough"
1424        }
1425    }
1426
1427    /// Sink processor: consumes arrays, returns nothing.
1428    struct SinkProcessor {
1429        count: usize,
1430    }
1431
1432    impl NDPluginProcess for SinkProcessor {
1433        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1434            self.count += 1;
1435            ProcessResult::empty()
1436        }
1437        fn plugin_type(&self) -> &str {
1438            "Sink"
1439        }
1440    }
1441
1442    fn make_test_array(id: i32) -> Arc<NDArray> {
1443        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1444        arr.unique_id = id;
1445        Arc::new(arr)
1446    }
1447
1448    fn test_wiring() -> Arc<WiringRegistry> {
1449        Arc::new(WiringRegistry::new())
1450    }
1451
1452    /// Enable callbacks on a plugin handle (plugins default to disabled).
1453    fn enable_callbacks(handle: &PluginRuntimeHandle) {
1454        handle
1455            .port_runtime()
1456            .port_handle()
1457            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1458            .unwrap();
1459        std::thread::sleep(std::time::Duration::from_millis(10));
1460    }
1461
1462    #[test]
1463    fn test_passthrough_runtime() {
1464        let pool = Arc::new(NDArrayPool::new(1_000_000));
1465
1466        // Create downstream receiver
1467        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1468        let mut output = NDArrayOutput::new();
1469        output.add(downstream_sender);
1470
1471        let (handle, _data_jh) = create_plugin_runtime_with_output(
1472            "PASS1",
1473            PassthroughProcessor,
1474            pool,
1475            10,
1476            output,
1477            "",
1478            test_wiring(),
1479        );
1480        enable_callbacks(&handle);
1481
1482        // Send an array
1483        handle.array_sender().send(make_test_array(42));
1484
1485        // Should come out the other side
1486        let received = downstream_rx.blocking_recv().unwrap();
1487        assert_eq!(received.unique_id, 42);
1488    }
1489
1490    #[test]
1491    fn test_sink_runtime() {
1492        let pool = Arc::new(NDArrayPool::new(1_000_000));
1493
1494        let (handle, _data_jh) = create_plugin_runtime(
1495            "SINK1",
1496            SinkProcessor { count: 0 },
1497            pool,
1498            10,
1499            "",
1500            test_wiring(),
1501        );
1502        enable_callbacks(&handle);
1503
1504        // Send arrays - they should be consumed silently
1505        handle.array_sender().send(make_test_array(1));
1506        handle.array_sender().send(make_test_array(2));
1507
1508        // Give processing thread time
1509        std::thread::sleep(std::time::Duration::from_millis(50));
1510
1511        // No crash, no output needed
1512        assert_eq!(handle.port_name(), "SINK1");
1513    }
1514
1515    #[test]
1516    fn test_plugin_type_param() {
1517        let pool = Arc::new(NDArrayPool::new(1_000_000));
1518
1519        let (handle, _data_jh) = create_plugin_runtime(
1520            "TYPE_TEST",
1521            PassthroughProcessor,
1522            pool,
1523            10,
1524            "",
1525            test_wiring(),
1526        );
1527
1528        // Verify port name
1529        assert_eq!(handle.port_name(), "TYPE_TEST");
1530        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1531    }
1532
1533    #[test]
1534    fn test_shutdown_on_handle_drop() {
1535        let pool = Arc::new(NDArrayPool::new(1_000_000));
1536
1537        let (handle, data_jh) = create_plugin_runtime(
1538            "SHUTDOWN_TEST",
1539            PassthroughProcessor,
1540            pool,
1541            10,
1542            "",
1543            test_wiring(),
1544        );
1545
1546        // Drop the handle (closes sender channel, which should cause data thread to exit)
1547        let sender = handle.array_sender().clone();
1548        drop(handle);
1549        drop(sender);
1550
1551        // Data thread should terminate
1552        let result = data_jh.join();
1553        assert!(result.is_ok());
1554    }
1555
1556    #[test]
1557    fn test_dropped_count_when_queue_full() {
1558        let pool = Arc::new(NDArrayPool::new(1_000_000));
1559
1560        // Very slow processor
1561        struct SlowProcessor;
1562        impl NDPluginProcess for SlowProcessor {
1563            fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1564                std::thread::sleep(std::time::Duration::from_millis(100));
1565                ProcessResult::empty()
1566            }
1567            fn plugin_type(&self) -> &str {
1568                "Slow"
1569            }
1570        }
1571
1572        let (handle, _data_jh) =
1573            create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1574        enable_callbacks(&handle);
1575
1576        // Fill the queue and overflow
1577        for i in 0..10 {
1578            handle.array_sender().send(make_test_array(i));
1579        }
1580
1581        // Some should have been dropped
1582        assert!(handle.array_sender().dropped_count() > 0);
1583    }
1584
1585    #[test]
1586    fn test_blocking_callbacks_basic() {
1587        let pool = Arc::new(NDArrayPool::new(1_000_000));
1588        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1589        let mut output = NDArrayOutput::new();
1590        output.add(downstream_sender);
1591
1592        let (handle, _data_jh) = create_plugin_runtime_with_output(
1593            "BLOCK_TEST",
1594            PassthroughProcessor,
1595            pool,
1596            10,
1597            output,
1598            "",
1599            test_wiring(),
1600        );
1601        enable_callbacks(&handle);
1602
1603        // Enable blocking mode
1604        handle
1605            .port_runtime()
1606            .port_handle()
1607            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1608            .unwrap();
1609        std::thread::sleep(std::time::Duration::from_millis(50));
1610
1611        // In blocking mode, send() processes inline and returns synchronously
1612        handle.array_sender().send(make_test_array(42));
1613
1614        // Array should already be in the downstream channel
1615        let received = downstream_rx.blocking_recv().unwrap();
1616        assert_eq!(received.unique_id, 42);
1617    }
1618
1619    #[test]
1620    fn test_blocking_to_nonblocking_switch() {
1621        let pool = Arc::new(NDArrayPool::new(1_000_000));
1622        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1623        let mut output = NDArrayOutput::new();
1624        output.add(downstream_sender);
1625
1626        let (handle, _data_jh) = create_plugin_runtime_with_output(
1627            "SWITCH_TEST",
1628            PassthroughProcessor,
1629            pool,
1630            10,
1631            output,
1632            "",
1633            test_wiring(),
1634        );
1635        enable_callbacks(&handle);
1636
1637        // Start in blocking mode
1638        handle
1639            .port_runtime()
1640            .port_handle()
1641            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1642            .unwrap();
1643        std::thread::sleep(std::time::Duration::from_millis(50));
1644
1645        handle.array_sender().send(make_test_array(1));
1646        let received = downstream_rx.blocking_recv().unwrap();
1647        assert_eq!(received.unique_id, 1);
1648
1649        // Switch back to non-blocking
1650        handle
1651            .port_runtime()
1652            .port_handle()
1653            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1654            .unwrap();
1655        std::thread::sleep(std::time::Duration::from_millis(50));
1656
1657        // Send in non-blocking mode — goes through channel to data thread
1658        handle.array_sender().send(make_test_array(2));
1659        let received = downstream_rx.blocking_recv().unwrap();
1660        assert_eq!(received.unique_id, 2);
1661    }
1662
1663    #[test]
1664    fn test_enable_callbacks_disables_processing() {
1665        let pool = Arc::new(NDArrayPool::new(1_000_000));
1666        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1667        let mut output = NDArrayOutput::new();
1668        output.add(downstream_sender);
1669
1670        let (handle, _data_jh) = create_plugin_runtime_with_output(
1671            "ENABLE_TEST",
1672            PassthroughProcessor,
1673            pool,
1674            10,
1675            output,
1676            "",
1677            test_wiring(),
1678        );
1679
1680        // Disable callbacks
1681        handle
1682            .port_runtime()
1683            .port_handle()
1684            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1685            .unwrap();
1686        std::thread::sleep(std::time::Duration::from_millis(50));
1687
1688        // Send array — should be silently dropped by sender
1689        handle.array_sender().send(make_test_array(99));
1690
1691        // Verify nothing received (with timeout)
1692        let rt = tokio::runtime::Builder::new_current_thread()
1693            .enable_all()
1694            .build()
1695            .unwrap();
1696        let result = rt.block_on(async {
1697            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1698        });
1699        assert!(
1700            result.is_err(),
1701            "should not receive array when callbacks disabled"
1702        );
1703    }
1704
1705    #[test]
1706    fn test_blocking_downstream_receives() {
1707        let pool = Arc::new(NDArrayPool::new(1_000_000));
1708
1709        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1710        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1711        let mut output = NDArrayOutput::new();
1712        output.add(ds1);
1713        output.add(ds2);
1714
1715        let (handle, _data_jh) = create_plugin_runtime_with_output(
1716            "BLOCK_DS_TEST",
1717            PassthroughProcessor,
1718            pool,
1719            10,
1720            output,
1721            "",
1722            test_wiring(),
1723        );
1724        enable_callbacks(&handle);
1725
1726        // Enable blocking mode
1727        handle
1728            .port_runtime()
1729            .port_handle()
1730            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1731            .unwrap();
1732        std::thread::sleep(std::time::Duration::from_millis(50));
1733
1734        handle.array_sender().send(make_test_array(77));
1735
1736        // Both downstream receivers should have the array
1737        let r1 = rx1.blocking_recv().unwrap();
1738        let r2 = rx2.blocking_recv().unwrap();
1739        assert_eq!(r1.unique_id, 77);
1740        assert_eq!(r2.unique_id, 77);
1741    }
1742
1743    #[test]
1744    fn test_blocking_param_updates() {
1745        let pool = Arc::new(NDArrayPool::new(1_000_000));
1746
1747        struct ParamTracker;
1748        impl NDPluginProcess for ParamTracker {
1749            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1750                ProcessResult::arrays(vec![Arc::new(array.clone())])
1751            }
1752            fn plugin_type(&self) -> &str {
1753                "ParamTracker"
1754            }
1755        }
1756
1757        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1758        let mut output = NDArrayOutput::new();
1759        output.add(downstream_sender);
1760
1761        let (handle, _data_jh) = create_plugin_runtime_with_output(
1762            "PARAM_TEST",
1763            ParamTracker,
1764            pool,
1765            10,
1766            output,
1767            "",
1768            test_wiring(),
1769        );
1770        enable_callbacks(&handle);
1771
1772        // Enable blocking mode
1773        handle
1774            .port_runtime()
1775            .port_handle()
1776            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1777            .unwrap();
1778        std::thread::sleep(std::time::Duration::from_millis(50));
1779
1780        // Send array in blocking mode
1781        handle.array_sender().send(make_test_array(1));
1782        let received = downstream_rx.blocking_recv().unwrap();
1783        assert_eq!(received.unique_id, 1);
1784
1785        // Write enable_callbacks while in blocking mode — should not crash
1786        handle
1787            .port_runtime()
1788            .port_handle()
1789            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1790            .unwrap();
1791        std::thread::sleep(std::time::Duration::from_millis(50));
1792
1793        // Still works after param update
1794        handle.array_sender().send(make_test_array(2));
1795        let received = downstream_rx.blocking_recv().unwrap();
1796        assert_eq!(received.unique_id, 2);
1797    }
1798
1799    /// Phase 0 regression test: process_and_publish inside a current-thread runtime must not panic.
1800    #[test]
1801    fn test_no_panic_in_current_thread_runtime() {
1802        let pool = Arc::new(NDArrayPool::new(1_000_000));
1803        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1804        let mut output = NDArrayOutput::new();
1805        output.add(downstream_sender);
1806
1807        let (handle, _data_jh) = create_plugin_runtime_with_output(
1808            "CURRENT_THREAD_TEST",
1809            PassthroughProcessor,
1810            pool,
1811            10,
1812            output,
1813            "",
1814            test_wiring(),
1815        );
1816        enable_callbacks(&handle);
1817
1818        // Enable blocking mode so process_and_publish runs inline
1819        handle
1820            .port_runtime()
1821            .port_handle()
1822            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1823            .unwrap();
1824        std::thread::sleep(std::time::Duration::from_millis(50));
1825
1826        // Call send (which calls process_and_publish inline) from inside a current-thread runtime
1827        let rt = tokio::runtime::Builder::new_current_thread()
1828            .enable_all()
1829            .build()
1830            .unwrap();
1831        rt.block_on(async {
1832            handle.array_sender().send(make_test_array(99));
1833        });
1834
1835        let received = downstream_rx.blocking_recv().unwrap();
1836        assert_eq!(received.unique_id, 99);
1837    }
1838
1839    #[test]
1840    fn test_sort_buffer_reorders_by_unique_id() {
1841        let mut buf = SortBuffer::new();
1842
1843        // Insert out of order: 3, 1, 2
1844        buf.insert(3, vec![make_test_array(3)], 10);
1845        buf.insert(1, vec![make_test_array(1)], 10);
1846        buf.insert(2, vec![make_test_array(2)], 10);
1847
1848        assert_eq!(buf.len(), 3);
1849
1850        let drained = buf.drain_all();
1851        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1852        assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1853        assert_eq!(buf.len(), 0);
1854        assert_eq!(buf.last_emitted_id, 3);
1855    }
1856
1857    #[test]
1858    fn test_sort_buffer_detects_disordered() {
1859        let mut buf = SortBuffer::new();
1860
1861        // Emit id=5, then insert id=3 (which is less than last_emitted_id)
1862        buf.insert(5, vec![make_test_array(5)], 10);
1863        buf.drain_all(); // emits id=5, last_emitted_id=5
1864
1865        buf.insert(3, vec![make_test_array(3)], 10);
1866        assert_eq!(buf.disordered_arrays, 1);
1867    }
1868
1869    #[test]
1870    fn test_sort_buffer_drops_when_full() {
1871        let mut buf = SortBuffer::new();
1872
1873        // sort_size=2, insert 3 entries
1874        buf.insert(1, vec![make_test_array(1)], 2);
1875        buf.insert(2, vec![make_test_array(2)], 2);
1876        buf.insert(3, vec![make_test_array(3)], 2);
1877
1878        // Buffer should have 2 entries (oldest dropped)
1879        assert_eq!(buf.len(), 2);
1880        assert_eq!(buf.dropped_output_arrays, 1);
1881
1882        let drained = buf.drain_all();
1883        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1884        assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1885    }
1886
1887    #[test]
1888    fn test_sort_mode_runtime_integration() {
1889        let pool = Arc::new(NDArrayPool::new(1_000_000));
1890        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1891        let mut output = NDArrayOutput::new();
1892        output.add(downstream_sender);
1893
1894        let (handle, _data_jh) = create_plugin_runtime_with_output(
1895            "SORT_TEST",
1896            PassthroughProcessor,
1897            pool,
1898            10,
1899            output,
1900            "",
1901            test_wiring(),
1902        );
1903        enable_callbacks(&handle);
1904
1905        // Enable sort mode with sort_size=10
1906        handle
1907            .port_runtime()
1908            .port_handle()
1909            .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1910            .unwrap();
1911        handle
1912            .port_runtime()
1913            .port_handle()
1914            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1915            .unwrap();
1916        std::thread::sleep(std::time::Duration::from_millis(50));
1917
1918        // Send arrays out of order
1919        handle.array_sender().send(make_test_array(3));
1920        handle.array_sender().send(make_test_array(1));
1921        handle.array_sender().send(make_test_array(2));
1922        std::thread::sleep(std::time::Duration::from_millis(100));
1923
1924        // Arrays should be buffered, not yet received
1925        let rt = tokio::runtime::Builder::new_current_thread()
1926            .enable_all()
1927            .build()
1928            .unwrap();
1929        let result = rt.block_on(async {
1930            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1931        });
1932        assert!(
1933            result.is_err(),
1934            "arrays should be buffered while sort mode is active"
1935        );
1936
1937        // Disable sort mode — should flush all buffered arrays in order
1938        handle
1939            .port_runtime()
1940            .port_handle()
1941            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1942            .unwrap();
1943        std::thread::sleep(std::time::Duration::from_millis(100));
1944
1945        // Receive all flushed arrays — they should arrive in sorted order
1946        let r1 = downstream_rx.blocking_recv().unwrap();
1947        let r2 = downstream_rx.blocking_recv().unwrap();
1948        let r3 = downstream_rx.blocking_recv().unwrap();
1949        assert_eq!(r1.unique_id, 1);
1950        assert_eq!(r2.unique_id, 2);
1951        assert_eq!(r3.unique_id, 3);
1952    }
1953}