Skip to main content

ad_core_rs/plugin/
runtime.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{
19    BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
20};
21use super::params::PluginBaseParams;
22use super::wiring::WiringRegistry;
23
24/// Value sent through the param change channel from control plane to data plane.
25#[derive(Debug, Clone)]
26pub enum ParamChangeValue {
27    Int32(i32),
28    Float64(f64),
29    Octet(String),
30}
31
32impl ParamChangeValue {
33    pub fn as_i32(&self) -> i32 {
34        match self {
35            ParamChangeValue::Int32(v) => *v,
36            ParamChangeValue::Float64(v) => *v as i32,
37            ParamChangeValue::Octet(_) => 0,
38        }
39    }
40
41    pub fn as_f64(&self) -> f64 {
42        match self {
43            ParamChangeValue::Int32(v) => *v as f64,
44            ParamChangeValue::Float64(v) => *v,
45            ParamChangeValue::Octet(_) => 0.0,
46        }
47    }
48
49    pub fn as_string(&self) -> Option<&str> {
50        match self {
51            ParamChangeValue::Octet(s) => Some(s),
52            _ => None,
53        }
54    }
55}
56
57/// A single parameter update produced by a plugin's process_array.
58pub enum ParamUpdate {
59    Int32 {
60        reason: usize,
61        addr: i32,
62        value: i32,
63    },
64    Float64 {
65        reason: usize,
66        addr: i32,
67        value: f64,
68    },
69    Octet {
70        reason: usize,
71        addr: i32,
72        value: String,
73    },
74    Float64Array {
75        reason: usize,
76        addr: i32,
77        value: Vec<f64>,
78    },
79}
80
81impl ParamUpdate {
82    /// Create an Int32 update at addr 0.
83    pub fn int32(reason: usize, value: i32) -> Self {
84        Self::Int32 {
85            reason,
86            addr: 0,
87            value,
88        }
89    }
90    /// Create a Float64 update at addr 0.
91    pub fn float64(reason: usize, value: f64) -> Self {
92        Self::Float64 {
93            reason,
94            addr: 0,
95            value,
96        }
97    }
98    /// Create an Int32 update at a specific addr.
99    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
100        Self::Int32 {
101            reason,
102            addr,
103            value,
104        }
105    }
106    /// Create a Float64 update at a specific addr.
107    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
108        Self::Float64 {
109            reason,
110            addr,
111            value,
112        }
113    }
114    /// Create a Float64Array update at addr 0.
115    pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
116        Self::Float64Array {
117            reason,
118            addr: 0,
119            value,
120        }
121    }
122    /// Create a Float64Array update at a specific addr.
123    pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
124        Self::Float64Array {
125            reason,
126            addr,
127            value,
128        }
129    }
130}
131
132/// Result of processing one array: output arrays + param updates to write back.
133pub struct ProcessResult {
134    pub output_arrays: Vec<Arc<NDArray>>,
135    pub param_updates: Vec<ParamUpdate>,
136    /// If set, only publish to the subscriber at this index (round-robin scatter).
137    pub scatter_index: Option<usize>,
138}
139
140impl ProcessResult {
141    /// Convenience: sink plugin with only param updates, no output arrays.
142    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
143        Self {
144            output_arrays: vec![],
145            param_updates,
146            scatter_index: None,
147        }
148    }
149
150    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
151    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
152        Self {
153            output_arrays,
154            param_updates: vec![],
155            scatter_index: None,
156        }
157    }
158
159    /// Convenience: no outputs, no param updates.
160    pub fn empty() -> Self {
161        Self {
162            output_arrays: vec![],
163            param_updates: vec![],
164            scatter_index: None,
165        }
166    }
167
168    /// Convenience: scatter output — send to a single subscriber by index.
169    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
170        Self {
171            output_arrays,
172            param_updates: vec![],
173            scatter_index: Some(index),
174        }
175    }
176}
177
178/// Result of handling a control-plane param change.
179pub struct ParamChangeResult {
180    pub output_arrays: Vec<Arc<NDArray>>,
181    pub param_updates: Vec<ParamUpdate>,
182}
183
184impl ParamChangeResult {
185    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
186        Self {
187            output_arrays: vec![],
188            param_updates,
189        }
190    }
191
192    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
193        Self {
194            output_arrays,
195            param_updates: vec![],
196        }
197    }
198
199    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
200        Self {
201            output_arrays,
202            param_updates,
203        }
204    }
205
206    pub fn empty() -> Self {
207        Self {
208            output_arrays: vec![],
209            param_updates: vec![],
210        }
211    }
212}
213
214/// Pure processing logic. No threading concerns.
215pub trait NDPluginProcess: Send + 'static {
216    /// Process one array. Return output arrays and param updates.
217    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
218
219    /// Plugin type name for PLUGIN_TYPE param.
220    fn plugin_type(&self) -> &str;
221
222    /// Register plugin-specific params on the base. Called once during construction.
223    fn register_params(
224        &mut self,
225        _base: &mut PortDriverBase,
226    ) -> Result<(), asyn_rs::error::AsynError> {
227        Ok(())
228    }
229
230    /// Called when a param changes. Reason is the param index.
231    /// Return param updates to be written back to the port driver.
232    fn on_param_change(
233        &mut self,
234        _reason: usize,
235        _params: &PluginParamSnapshot,
236    ) -> ParamChangeResult {
237        ParamChangeResult::empty()
238    }
239
240    /// Return a handle to the latest NDArray data for array reads.
241    /// Override this in plugins like NDPluginStdArrays that serve pixel data
242    /// via readInt8Array/readInt16Array/etc.
243    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
244        None
245    }
246}
247
248/// Read-only snapshot of param values available to the processing thread.
249pub struct PluginParamSnapshot {
250    pub enable_callbacks: bool,
251    /// The param reason that changed.
252    pub reason: usize,
253    /// The address (sub-device) that changed.
254    pub addr: i32,
255    /// The new value.
256    pub value: ParamChangeValue,
257}
258
259/// Sort buffer for reordering output arrays by uniqueId.
260///
261/// When sort_mode is enabled, output arrays are inserted into a BTreeMap
262/// keyed by uniqueId instead of being sent directly. A periodic flush task
263/// drains arrays in uniqueId order.
264struct SortBuffer {
265    /// Buffered arrays keyed by uniqueId, ordered by BTreeMap.
266    entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
267    /// The last uniqueId that was emitted (for detecting disordered arrays).
268    last_emitted_id: i32,
269    /// Counter of arrays received out of order (uniqueId < last_emitted_id).
270    disordered_arrays: i32,
271    /// Counter of arrays dropped because the buffer was full.
272    dropped_output_arrays: i32,
273}
274
275impl SortBuffer {
276    fn new() -> Self {
277        Self {
278            entries: BTreeMap::new(),
279            last_emitted_id: 0,
280            disordered_arrays: 0,
281            dropped_output_arrays: 0,
282        }
283    }
284
285    /// Insert arrays into the sort buffer. If buffer exceeds sort_size, drop oldest entries.
286    fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
287        if unique_id < self.last_emitted_id {
288            self.disordered_arrays += 1;
289        }
290        self.entries.entry(unique_id).or_default().extend(arrays);
291
292        // Enforce sort_size limit by dropping oldest entries
293        while sort_size > 0 && self.entries.len() as i32 > sort_size {
294            if let Some((&oldest_key, _)) = self.entries.iter().next() {
295                self.entries.remove(&oldest_key);
296                self.dropped_output_arrays += 1;
297            }
298        }
299    }
300
301    /// Drain all buffered arrays in uniqueId order. Returns them as (uniqueId, arrays) pairs.
302    fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
303        let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
304        if let Some(&(last_id, _)) = entries.last() {
305            self.last_emitted_id = last_id;
306        }
307        entries
308    }
309
310    /// Number of uniqueId entries currently buffered.
311    fn len(&self) -> i32 {
312        self.entries.len() as i32
313    }
314}
315
316/// Shared processor state protected by a mutex, accessible from both
317/// the data thread (non-blocking mode) and the caller thread (blocking mode).
318struct SharedProcessorInner<P: NDPluginProcess> {
319    processor: P,
320    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
321    pool: Arc<NDArrayPool>,
322    ndarray_params: NDArrayDriverParams,
323    plugin_params: PluginBaseParams,
324    port_handle: PortHandle,
325    array_counter: i32,
326    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
327    std_array_data_param: Option<usize>,
328    /// MinCallbackTime throttling: minimum seconds between process calls.
329    min_callback_time: f64,
330    /// Last time process_and_publish was called (for throttling).
331    last_process_time: Option<std::time::Instant>,
332    /// Sort mode: 0 = disabled, 1 = sorted output.
333    sort_mode: i32,
334    /// Sort time: seconds between periodic flushes of the sort buffer.
335    sort_time: f64,
336    /// Sort size: maximum number of uniqueId entries in the sort buffer.
337    sort_size: i32,
338    /// Sort buffer for reordering output arrays by uniqueId.
339    sort_buffer: SortBuffer,
340}
341
342impl<P: NDPluginProcess> SharedProcessorInner<P> {
343    fn should_throttle(&self) -> bool {
344        if self.min_callback_time <= 0.0 {
345            return false;
346        }
347        if let Some(last) = self.last_process_time {
348            last.elapsed().as_secs_f64() < self.min_callback_time
349        } else {
350            false
351        }
352    }
353
354    fn process_and_publish(&mut self, array: &NDArray) {
355        if self.should_throttle() {
356            return;
357        }
358        let t0 = std::time::Instant::now();
359        let result = self.processor.process_array(array, &self.pool);
360        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
361        self.last_process_time = Some(t0);
362
363        if self.sort_mode != 0 && !result.output_arrays.is_empty() {
364            // Insert into sort buffer instead of publishing directly
365            let unique_id = array.unique_id;
366            self.sort_buffer
367                .insert(unique_id, result.output_arrays, self.sort_size);
368            // Update sort stats params
369            self.update_sort_params();
370            // Still publish param updates immediately
371            if !result.param_updates.is_empty() {
372                self.publish_result(
373                    vec![],
374                    result.param_updates,
375                    result.scatter_index,
376                    Some(array),
377                    elapsed_ms,
378                );
379            }
380        } else {
381            self.publish_result(
382                result.output_arrays,
383                result.param_updates,
384                result.scatter_index,
385                Some(array),
386                elapsed_ms,
387            );
388        }
389    }
390
391    /// Flush the sort buffer: drain all arrays in uniqueId order and publish them.
392    fn flush_sort_buffer(&mut self) {
393        let entries = self.sort_buffer.drain_all();
394        for (_unique_id, arrays) in entries {
395            self.publish_result(arrays, vec![], None, None, 0.0);
396        }
397        self.update_sort_params();
398    }
399
400    /// Update sort-related param values (SortFree, DisorderedArrays, DroppedOutputArrays).
401    fn update_sort_params(&self) {
402        let sort_free = self.sort_size - self.sort_buffer.len();
403        self.port_handle
404            .write_int32_no_wait(self.plugin_params.sort_free, 0, sort_free);
405        self.port_handle.write_int32_no_wait(
406            self.plugin_params.disordered_arrays,
407            0,
408            self.sort_buffer.disordered_arrays,
409        );
410        self.port_handle.write_int32_no_wait(
411            self.plugin_params.dropped_output_arrays,
412            0,
413            self.sort_buffer.dropped_output_arrays,
414        );
415    }
416
417    fn publish_result(
418        &mut self,
419        output_arrays: Vec<Arc<NDArray>>,
420        param_updates: Vec<ParamUpdate>,
421        scatter_index: Option<usize>,
422        fallback_array: Option<&NDArray>,
423        elapsed_ms: f64,
424    ) {
425        let output = self.output.lock();
426        for out in &output_arrays {
427            if let Some(idx) = scatter_index {
428                output.publish_to(idx, out.clone());
429            } else {
430                output.publish(out.clone());
431            }
432        }
433        drop(output);
434
435        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
436            self.array_counter += 1;
437
438            // Fire array data interrupt directly (C EPICS pattern).
439            // Bypasses port actor channel to avoid dropping large array messages.
440            if let Some(param) = self.std_array_data_param {
441                use crate::ndarray::NDDataBuffer;
442                use asyn_rs::param::ParamValue;
443                let value = match &report_arr.data {
444                    NDDataBuffer::I8(v) => {
445                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
446                    }
447                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
448                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
449                    ))),
450                    NDDataBuffer::I16(v) => {
451                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
452                    }
453                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
454                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
455                    ))),
456                    NDDataBuffer::I32(v) => {
457                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
458                    }
459                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
460                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
461                    ))),
462                    NDDataBuffer::I64(v) => {
463                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
464                    }
465                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
466                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
467                    ))),
468                    NDDataBuffer::F32(v) => {
469                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
470                    }
471                    NDDataBuffer::F64(v) => {
472                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
473                    }
474                };
475                if let Some(value) = value {
476                    let ts = report_arr.timestamp.to_system_time();
477                    self.port_handle
478                        .interrupts()
479                        .notify(asyn_rs::interrupt::InterruptValue {
480                            reason: param,
481                            addr: 0,
482                            value,
483                            timestamp: ts,
484                            uint32_changed_mask: 0,
485                        });
486                }
487            }
488
489            let info = report_arr.info();
490            let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
491            self.port_handle.write_int32_no_wait(
492                self.ndarray_params.array_counter,
493                0,
494                self.array_counter,
495            );
496            self.port_handle.write_int32_no_wait(
497                self.ndarray_params.unique_id,
498                0,
499                report_arr.unique_id,
500            );
501            self.port_handle.write_int32_no_wait(
502                self.ndarray_params.n_dimensions,
503                0,
504                report_arr.dims.len() as i32,
505            );
506            self.port_handle.write_int32_no_wait(
507                self.ndarray_params.array_size_x,
508                0,
509                info.x_size as i32,
510            );
511            self.port_handle.write_int32_no_wait(
512                self.ndarray_params.array_size_y,
513                0,
514                info.y_size as i32,
515            );
516            self.port_handle.write_int32_no_wait(
517                self.ndarray_params.array_size_z,
518                0,
519                info.color_size as i32,
520            );
521            self.port_handle.write_int32_no_wait(
522                self.ndarray_params.array_size,
523                0,
524                info.total_bytes as i32,
525            );
526            self.port_handle.write_int32_no_wait(
527                self.ndarray_params.data_type,
528                0,
529                report_arr.data.data_type() as i32,
530            );
531            self.port_handle
532                .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
533
534            let ts_f64 = report_arr.timestamp.as_f64();
535            self.port_handle
536                .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
537            self.port_handle.write_int32_no_wait(
538                self.ndarray_params.epics_ts_sec,
539                0,
540                report_arr.timestamp.sec as i32,
541            );
542            self.port_handle.write_int32_no_wait(
543                self.ndarray_params.epics_ts_nsec,
544                0,
545                report_arr.timestamp.nsec as i32,
546            );
547        }
548
549        self.port_handle
550            .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
551
552        // Set params directly and fire callbacks — no writeInt32/on_param_change re-entrancy.
553        // This mirrors C ADCore's setIntegerParam + callParamCallbacks pattern.
554        use asyn_rs::request::ParamSetValue;
555
556        let mut addr0_updates: Vec<ParamSetValue> = Vec::new();
557        let mut extra_addr_map: std::collections::HashMap<i32, Vec<ParamSetValue>> =
558            std::collections::HashMap::new();
559
560        for update in &param_updates {
561            match update {
562                ParamUpdate::Int32 {
563                    reason,
564                    addr,
565                    value,
566                } => {
567                    let pv = ParamSetValue::Int32 {
568                        reason: *reason,
569                        addr: *addr,
570                        value: *value,
571                    };
572                    if *addr == 0 {
573                        addr0_updates.push(pv);
574                    } else {
575                        extra_addr_map.entry(*addr).or_default().push(pv);
576                    }
577                }
578                ParamUpdate::Float64 {
579                    reason,
580                    addr,
581                    value,
582                } => {
583                    let pv = ParamSetValue::Float64 {
584                        reason: *reason,
585                        addr: *addr,
586                        value: *value,
587                    };
588                    if *addr == 0 {
589                        addr0_updates.push(pv);
590                    } else {
591                        extra_addr_map.entry(*addr).or_default().push(pv);
592                    }
593                }
594                ParamUpdate::Octet {
595                    reason,
596                    addr,
597                    value,
598                } => {
599                    let pv = ParamSetValue::Octet {
600                        reason: *reason,
601                        addr: *addr,
602                        value: value.clone(),
603                    };
604                    if *addr == 0 {
605                        addr0_updates.push(pv);
606                    } else {
607                        extra_addr_map.entry(*addr).or_default().push(pv);
608                    }
609                }
610                ParamUpdate::Float64Array {
611                    reason,
612                    addr,
613                    value,
614                } => {
615                    let pv = ParamSetValue::Float64Array {
616                        reason: *reason,
617                        addr: *addr,
618                        value: value.clone(),
619                    };
620                    if *addr == 0 {
621                        addr0_updates.push(pv);
622                    } else {
623                        extra_addr_map.entry(*addr).or_default().push(pv);
624                    }
625                }
626            }
627        }
628
629        self.port_handle.set_params_and_notify(0, addr0_updates);
630        for (addr, updates) in extra_addr_map {
631            self.port_handle.set_params_and_notify(addr, updates);
632        }
633    }
634}
635
636/// Type-erased handle for blocking mode: allows NDArraySender to call
637/// process_and_publish without knowing the concrete processor type.
638struct BlockingProcessorHandle<P: NDPluginProcess> {
639    inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
640}
641
642impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
643    fn process_and_publish(&self, array: &NDArray) {
644        self.inner.lock().process_and_publish(array);
645    }
646}
647
648/// PortDriver implementation for a plugin's control plane.
649#[allow(dead_code)]
650pub struct PluginPortDriver {
651    base: PortDriverBase,
652    ndarray_params: NDArrayDriverParams,
653    plugin_params: PluginBaseParams,
654    param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
655    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
656    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
657    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
658    std_array_data_param: Option<usize>,
659}
660
661impl PluginPortDriver {
662    fn new<P: NDPluginProcess>(
663        port_name: &str,
664        plugin_type_name: &str,
665        queue_size: usize,
666        ndarray_port: &str,
667        max_addr: usize,
668        param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
669        processor: &mut P,
670        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
671    ) -> AsynResult<Self> {
672        let mut base = PortDriverBase::new(
673            port_name,
674            max_addr,
675            PortFlags {
676                can_block: true,
677                ..Default::default()
678            },
679        );
680
681        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
682        let plugin_params = PluginBaseParams::create(&mut base)?;
683
684        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
685        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
686        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
687        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
688        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
689        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
690        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
691        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
692        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
693        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
694        base.set_int32_param(ndarray_params.capture, 0, 0)?;
695        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
696        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
697        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
698        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
699        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
700        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
701        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
702        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
703        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
704        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
705
706        // Set plugin identity params
707        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
708        if !ndarray_port.is_empty() {
709            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
710        }
711
712        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
713        let std_array_data_param = if array_data.is_some() {
714            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
715        } else {
716            None
717        };
718
719        // Let the processor register its plugin-specific params
720        processor.register_params(&mut base)?;
721
722        Ok(Self {
723            base,
724            ndarray_params,
725            plugin_params,
726            param_change_tx,
727            array_data,
728            std_array_data_param,
729        })
730    }
731}
732
733/// Copy source slice directly into destination buffer, returning elements copied.
734fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
735    let n = src.len().min(dst.len());
736    dst[..n].copy_from_slice(&src[..n]);
737    n
738}
739
740/// Convert and copy source slice into destination buffer element-by-element.
741fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
742where
743    S: CastToF64 + Copy,
744    D: CastFromF64 + Copy,
745{
746    let n = src.len().min(dst.len());
747    for i in 0..n {
748        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
749    }
750    n
751}
752
753/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
754trait CastToF64 {
755    fn cast_to_f64(self) -> f64;
756}
757
758impl CastToF64 for i8 {
759    fn cast_to_f64(self) -> f64 {
760        self as f64
761    }
762}
763impl CastToF64 for u8 {
764    fn cast_to_f64(self) -> f64 {
765        self as f64
766    }
767}
768impl CastToF64 for i16 {
769    fn cast_to_f64(self) -> f64 {
770        self as f64
771    }
772}
773impl CastToF64 for u16 {
774    fn cast_to_f64(self) -> f64 {
775        self as f64
776    }
777}
778impl CastToF64 for i32 {
779    fn cast_to_f64(self) -> f64 {
780        self as f64
781    }
782}
783impl CastToF64 for u32 {
784    fn cast_to_f64(self) -> f64 {
785        self as f64
786    }
787}
788impl CastToF64 for i64 {
789    fn cast_to_f64(self) -> f64 {
790        self as f64
791    }
792}
793impl CastToF64 for u64 {
794    fn cast_to_f64(self) -> f64 {
795        self as f64
796    }
797}
798impl CastToF64 for f32 {
799    fn cast_to_f64(self) -> f64 {
800        self as f64
801    }
802}
803impl CastToF64 for f64 {
804    fn cast_to_f64(self) -> f64 {
805        self
806    }
807}
808
809/// Helper trait for `as` casts from f64.
810trait CastFromF64 {
811    fn cast_from_f64(v: f64) -> Self;
812}
813
814impl CastFromF64 for i8 {
815    fn cast_from_f64(v: f64) -> Self {
816        v as i8
817    }
818}
819impl CastFromF64 for i16 {
820    fn cast_from_f64(v: f64) -> Self {
821        v as i16
822    }
823}
824impl CastFromF64 for i32 {
825    fn cast_from_f64(v: f64) -> Self {
826        v as i32
827    }
828}
829impl CastFromF64 for f32 {
830    fn cast_from_f64(v: f64) -> Self {
831        v as f32
832    }
833}
834impl CastFromF64 for f64 {
835    fn cast_from_f64(v: f64) -> Self {
836        v
837    }
838}
839
840/// Copy NDArray data into the output buffer with type conversion.
841/// Returns the number of elements copied, or 0 if no data is available.
842macro_rules! impl_read_array {
843    ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
844        use crate::ndarray::NDDataBuffer;
845        let handle = match &$self.array_data {
846            Some(h) => h,
847            None => return Ok(0),
848        };
849        let guard = handle.lock();
850        let array = match &*guard {
851            Some(a) => a,
852            None => return Ok(0),
853        };
854        let n = match &array.data {
855            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
856            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
857        };
858        Ok(n)
859    }};
860}
861
862impl PortDriver for PluginPortDriver {
863    fn base(&self) -> &PortDriverBase {
864        &self.base
865    }
866
867    fn base_mut(&mut self) -> &mut PortDriverBase {
868        &mut self.base
869    }
870
871    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
872        let reason = user.reason;
873        let addr = user.addr;
874        self.base.set_int32_param(reason, addr, value)?;
875        self.base.call_param_callbacks(addr)?;
876        let _ = self
877            .param_change_tx
878            .try_send((reason, addr, ParamChangeValue::Int32(value)));
879        Ok(())
880    }
881
882    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
883        let reason = user.reason;
884        let addr = user.addr;
885        self.base.set_float64_param(reason, addr, value)?;
886        self.base.call_param_callbacks(addr)?;
887        let _ = self
888            .param_change_tx
889            .try_send((reason, addr, ParamChangeValue::Float64(value)));
890        Ok(())
891    }
892
893    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
894        let reason = user.reason;
895        let addr = user.addr;
896        let s = String::from_utf8_lossy(data).into_owned();
897        self.base.set_string_param(reason, addr, s.clone())?;
898        self.base.call_param_callbacks(addr)?;
899        let _ = self
900            .param_change_tx
901            .try_send((reason, addr, ParamChangeValue::Octet(s)));
902        Ok(())
903    }
904
905    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
906        impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
907    }
908
909    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
910        impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
911    }
912
913    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
914        impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
915    }
916
917    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
918        impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
919    }
920
921    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
922        impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
923    }
924}
925
926/// Handle to a running plugin runtime. Provides access to sender and port handle.
927#[derive(Clone)]
928pub struct PluginRuntimeHandle {
929    port_runtime: PortRuntimeHandle,
930    array_sender: NDArraySender,
931    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
932    port_name: String,
933    pub ndarray_params: NDArrayDriverParams,
934    pub plugin_params: PluginBaseParams,
935}
936
937impl PluginRuntimeHandle {
938    pub fn port_runtime(&self) -> &PortRuntimeHandle {
939        &self.port_runtime
940    }
941
942    pub fn array_sender(&self) -> &NDArraySender {
943        &self.array_sender
944    }
945
946    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
947        &self.array_output
948    }
949
950    pub fn port_name(&self) -> &str {
951        &self.port_name
952    }
953}
954
955/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
956///
957/// Returns:
958/// - `PluginRuntimeHandle` for wiring and control
959/// - `PortRuntimeHandle` for param I/O
960/// - `JoinHandle` for the data processing thread
961pub fn create_plugin_runtime<P: NDPluginProcess>(
962    port_name: &str,
963    processor: P,
964    pool: Arc<NDArrayPool>,
965    queue_size: usize,
966    ndarray_port: &str,
967    wiring: Arc<WiringRegistry>,
968) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
969    create_plugin_runtime_multi_addr(
970        port_name,
971        processor,
972        pool,
973        queue_size,
974        ndarray_port,
975        wiring,
976        1,
977    )
978}
979
980/// Create a plugin runtime with multi-addr support.
981///
982/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
983pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
984    port_name: &str,
985    mut processor: P,
986    pool: Arc<NDArrayPool>,
987    queue_size: usize,
988    ndarray_port: &str,
989    wiring: Arc<WiringRegistry>,
990    max_addr: usize,
991) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
992    // Param change channel (control plane -> data plane)
993    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
994
995    // Capture plugin type and array data handle before mutable borrow
996    let plugin_type_name = processor.plugin_type().to_string();
997    let array_data = processor.array_data_handle();
998
999    // Create the port driver for control plane
1000    let driver = PluginPortDriver::new(
1001        port_name,
1002        &plugin_type_name,
1003        queue_size,
1004        ndarray_port,
1005        max_addr,
1006        param_tx,
1007        &mut processor,
1008        array_data,
1009    )
1010    .expect("failed to create plugin port driver");
1011
1012    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1013    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1014    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1015    let sort_mode_reason = driver.plugin_params.sort_mode;
1016    let sort_time_reason = driver.plugin_params.sort_time;
1017    let sort_size_reason = driver.plugin_params.sort_size;
1018    let ndarray_params = driver.ndarray_params;
1019    let plugin_params = driver.plugin_params;
1020    let std_array_data_param = driver.std_array_data_param;
1021
1022    // Create port runtime (actor thread for param I/O)
1023    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1024
1025    // Clone port handle for the data thread to write params back
1026    let port_handle = port_runtime.port_handle().clone();
1027
1028    // Array channel (data plane)
1029    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1030
1031    // Shared mode flags
1032    let enabled = Arc::new(AtomicBool::new(false));
1033    let blocking_mode = Arc::new(AtomicBool::new(false));
1034
1035    // Shared processor (accessible from both data thread and caller thread)
1036    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1037    let array_output_for_handle = array_output.clone();
1038    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1039        processor,
1040        output: array_output,
1041        pool,
1042        ndarray_params,
1043        plugin_params,
1044        port_handle,
1045        array_counter: 0,
1046        std_array_data_param,
1047        min_callback_time: 0.0,
1048        last_process_time: None,
1049        sort_mode: 0,
1050        sort_time: 0.0,
1051        sort_size: 10,
1052        sort_buffer: SortBuffer::new(),
1053    }));
1054
1055    // Type-erased handle for blocking mode
1056    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1057        inner: shared.clone(),
1058    });
1059
1060    let data_enabled = enabled.clone();
1061    let data_blocking = blocking_mode.clone();
1062
1063    // Capture queue metrics before with_blocking_support consumes the sender
1064    let dropped_count = array_sender.dropped_count_shared();
1065    let queue_tx = array_sender.tx_clone();
1066
1067    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1068
1069    // Capture wiring info for data loop
1070    let nd_array_port_reason = plugin_params.nd_array_port;
1071    let sender_port_name = port_name.to_string();
1072    let initial_upstream = ndarray_port.to_string();
1073
1074    // Spawn data processing thread
1075    let data_jh = thread::Builder::new()
1076        .name(format!("plugin-data-{port_name}"))
1077        .spawn(move || {
1078            plugin_data_loop(
1079                shared,
1080                array_rx,
1081                param_rx,
1082                enable_callbacks_reason,
1083                blocking_callbacks_reason,
1084                min_callback_time_reason,
1085                sort_mode_reason,
1086                sort_time_reason,
1087                sort_size_reason,
1088                data_enabled,
1089                data_blocking,
1090                nd_array_port_reason,
1091                sender_port_name,
1092                initial_upstream,
1093                wiring,
1094                dropped_count,
1095                queue_tx,
1096            );
1097        })
1098        .expect("failed to spawn plugin data thread");
1099
1100    let handle = PluginRuntimeHandle {
1101        port_runtime,
1102        array_sender,
1103        array_output: array_output_for_handle,
1104        port_name: port_name.to_string(),
1105        ndarray_params,
1106        plugin_params,
1107    };
1108
1109    (handle, data_jh)
1110}
1111
1112fn plugin_data_loop<P: NDPluginProcess>(
1113    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1114    mut array_rx: NDArrayReceiver,
1115    mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1116    enable_callbacks_reason: usize,
1117    blocking_callbacks_reason: usize,
1118    min_callback_time_reason: usize,
1119    sort_mode_reason: usize,
1120    sort_time_reason: usize,
1121    sort_size_reason: usize,
1122    enabled: Arc<AtomicBool>,
1123    blocking_mode: Arc<AtomicBool>,
1124    nd_array_port_reason: usize,
1125    sender_port_name: String,
1126    initial_upstream: String,
1127    wiring: Arc<WiringRegistry>,
1128    dropped_count: Arc<std::sync::atomic::AtomicU64>,
1129    queue_tx: tokio::sync::mpsc::Sender<super::channel::ArrayMessage>,
1130) {
1131    let mut current_upstream = initial_upstream;
1132    let rt = tokio::runtime::Builder::new_current_thread()
1133        .enable_all()
1134        .build()
1135        .unwrap();
1136    rt.block_on(async {
1137        // Sort flush timer — starts disabled (very long interval).
1138        // Re-created when sort_time changes.
1139        let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1140        let mut sort_flush_active = false;
1141
1142        loop {
1143            tokio::select! {
1144                msg = array_rx.recv_msg() => {
1145                    match msg {
1146                        Some(msg) => {
1147                            // In blocking mode, arrays are processed inline by the caller.
1148                            // Skip processing here to avoid double-processing.
1149                            if !blocking_mode.load(Ordering::Acquire) {
1150                                shared.lock().process_and_publish(&msg.array);
1151                            }
1152                            // msg dropped here → completion signaled (if tracked)
1153
1154                            // Update queue metrics (C parity: DroppedArrays + QueueFree)
1155                            let guard = shared.lock();
1156                            let queue_free = queue_tx.capacity() as i32;
1157                            let dropped = dropped_count.load(Ordering::Relaxed) as i32;
1158                            guard.port_handle.write_int32_no_wait(
1159                                guard.plugin_params.queue_use, 0, queue_free,
1160                            );
1161                            guard.port_handle.write_int32_no_wait(
1162                                guard.plugin_params.dropped_arrays, 0, dropped,
1163                            );
1164                            drop(guard);
1165                        }
1166                        None => break,
1167                    }
1168                }
1169                param = param_rx.recv() => {
1170                    match param {
1171                        Some((reason, addr, value)) => {
1172                            if reason == enable_callbacks_reason {
1173                                enabled.store(value.as_i32() != 0, Ordering::Release);
1174                            }
1175                            if reason == blocking_callbacks_reason {
1176                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1177                            }
1178                            // Handle MinCallbackTime param change
1179                            if reason == min_callback_time_reason {
1180                                shared.lock().min_callback_time = value.as_f64();
1181                            }
1182                            // Handle sort param changes
1183                            if reason == sort_mode_reason {
1184                                let mode = value.as_i32();
1185                                let mut guard = shared.lock();
1186                                guard.sort_mode = mode;
1187                                if mode == 0 {
1188                                    // Flush remaining buffered arrays when disabling sort mode
1189                                    guard.flush_sort_buffer();
1190                                    sort_flush_active = false;
1191                                } else {
1192                                    // Activate flush timer if sort_time > 0
1193                                    sort_flush_active = guard.sort_time > 0.0;
1194                                    if sort_flush_active {
1195                                        let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1196                                        sort_flush_interval = tokio::time::interval(dur);
1197                                    }
1198                                }
1199                                drop(guard);
1200                            }
1201                            if reason == sort_time_reason {
1202                                let t = value.as_f64();
1203                                let mut guard = shared.lock();
1204                                guard.sort_time = t;
1205                                if guard.sort_mode != 0 && t > 0.0 {
1206                                    sort_flush_active = true;
1207                                    let dur = std::time::Duration::from_secs_f64(t);
1208                                    sort_flush_interval = tokio::time::interval(dur);
1209                                } else {
1210                                    sort_flush_active = false;
1211                                }
1212                                drop(guard);
1213                            }
1214                            if reason == sort_size_reason {
1215                                shared.lock().sort_size = value.as_i32();
1216                            }
1217                            // Handle NDArrayPort rewiring
1218                            if reason == nd_array_port_reason {
1219                                if let Some(new_port) = value.as_string() {
1220                                    if new_port != current_upstream {
1221                                        let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1222                                        if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1223                                            eprintln!("NDArrayPort rewire failed: {e}");
1224                                            current_upstream = old;
1225                                        }
1226                                    }
1227                                }
1228                            }
1229                            let snapshot = PluginParamSnapshot {
1230                                enable_callbacks: enabled.load(Ordering::Acquire),
1231                                reason,
1232                                addr,
1233                                value,
1234                            };
1235                            let mut guard = shared.lock();
1236                            let t0 = std::time::Instant::now();
1237                            let result = guard.processor.on_param_change(reason, &snapshot);
1238                            let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1239                            if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1240                                guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
1241                            }
1242                            drop(guard);
1243                        }
1244                        None => break,
1245                    }
1246                }
1247                _ = sort_flush_interval.tick(), if sort_flush_active => {
1248                    shared.lock().flush_sort_buffer();
1249                }
1250            }
1251        }
1252    });
1253}
1254
1255/// Connect a downstream plugin's sender to a plugin runtime's output.
1256pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1257    upstream.array_output().lock().add(downstream_sender);
1258}
1259
1260/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
1261pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1262    port_name: &str,
1263    mut processor: P,
1264    pool: Arc<NDArrayPool>,
1265    queue_size: usize,
1266    output: NDArrayOutput,
1267    ndarray_port: &str,
1268    wiring: Arc<WiringRegistry>,
1269) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1270    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1271
1272    let plugin_type_name = processor.plugin_type().to_string();
1273    let array_data = processor.array_data_handle();
1274    let driver = PluginPortDriver::new(
1275        port_name,
1276        &plugin_type_name,
1277        queue_size,
1278        ndarray_port,
1279        1,
1280        param_tx,
1281        &mut processor,
1282        array_data,
1283    )
1284    .expect("failed to create plugin port driver");
1285
1286    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1287    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1288    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1289    let sort_mode_reason = driver.plugin_params.sort_mode;
1290    let sort_time_reason = driver.plugin_params.sort_time;
1291    let sort_size_reason = driver.plugin_params.sort_size;
1292    let ndarray_params = driver.ndarray_params;
1293    let plugin_params = driver.plugin_params;
1294    let std_array_data_param = driver.std_array_data_param;
1295
1296    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1297
1298    let port_handle = port_runtime.port_handle().clone();
1299
1300    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1301
1302    let enabled = Arc::new(AtomicBool::new(false));
1303    let blocking_mode = Arc::new(AtomicBool::new(false));
1304
1305    let array_output = Arc::new(parking_lot::Mutex::new(output));
1306    let array_output_for_handle = array_output.clone();
1307    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1308        processor,
1309        output: array_output,
1310        pool,
1311        ndarray_params,
1312        plugin_params,
1313        port_handle,
1314        array_counter: 0,
1315        std_array_data_param,
1316        min_callback_time: 0.0,
1317        last_process_time: None,
1318        sort_mode: 0,
1319        sort_time: 0.0,
1320        sort_size: 10,
1321        sort_buffer: SortBuffer::new(),
1322    }));
1323
1324    let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1325        inner: shared.clone(),
1326    });
1327
1328    let data_enabled = enabled.clone();
1329    let data_blocking = blocking_mode.clone();
1330
1331    // Capture queue metrics before with_blocking_support consumes the sender
1332    let dropped_count = array_sender.dropped_count_shared();
1333    let queue_tx = array_sender.tx_clone();
1334
1335    let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1336
1337    // Capture wiring info for data loop
1338    let nd_array_port_reason = plugin_params.nd_array_port;
1339    let sender_port_name = port_name.to_string();
1340    let initial_upstream = ndarray_port.to_string();
1341
1342    let data_jh = thread::Builder::new()
1343        .name(format!("plugin-data-{port_name}"))
1344        .spawn(move || {
1345            plugin_data_loop(
1346                shared,
1347                array_rx,
1348                param_rx,
1349                enable_callbacks_reason,
1350                blocking_callbacks_reason,
1351                min_callback_time_reason,
1352                sort_mode_reason,
1353                sort_time_reason,
1354                sort_size_reason,
1355                data_enabled,
1356                data_blocking,
1357                nd_array_port_reason,
1358                sender_port_name,
1359                initial_upstream,
1360                wiring,
1361                dropped_count,
1362                queue_tx,
1363            );
1364        })
1365        .expect("failed to spawn plugin data thread");
1366
1367    let handle = PluginRuntimeHandle {
1368        port_runtime,
1369        array_sender,
1370        array_output: array_output_for_handle,
1371        port_name: port_name.to_string(),
1372        ndarray_params,
1373        plugin_params,
1374    };
1375
1376    (handle, data_jh)
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381    use super::*;
1382    use crate::ndarray::{NDDataType, NDDimension};
1383    use crate::plugin::channel::ndarray_channel;
1384
1385    /// Passthrough processor: returns the input array as-is.
1386    struct PassthroughProcessor;
1387
1388    impl NDPluginProcess for PassthroughProcessor {
1389        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1390            ProcessResult::arrays(vec![Arc::new(array.clone())])
1391        }
1392        fn plugin_type(&self) -> &str {
1393            "Passthrough"
1394        }
1395    }
1396
1397    /// Sink processor: consumes arrays, returns nothing.
1398    struct SinkProcessor {
1399        count: usize,
1400    }
1401
1402    impl NDPluginProcess for SinkProcessor {
1403        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1404            self.count += 1;
1405            ProcessResult::empty()
1406        }
1407        fn plugin_type(&self) -> &str {
1408            "Sink"
1409        }
1410    }
1411
1412    fn make_test_array(id: i32) -> Arc<NDArray> {
1413        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1414        arr.unique_id = id;
1415        Arc::new(arr)
1416    }
1417
1418    fn test_wiring() -> Arc<WiringRegistry> {
1419        Arc::new(WiringRegistry::new())
1420    }
1421
1422    /// Enable callbacks on a plugin handle (plugins default to disabled).
1423    fn enable_callbacks(handle: &PluginRuntimeHandle) {
1424        handle
1425            .port_runtime()
1426            .port_handle()
1427            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1428            .unwrap();
1429        std::thread::sleep(std::time::Duration::from_millis(10));
1430    }
1431
1432    #[test]
1433    fn test_passthrough_runtime() {
1434        let pool = Arc::new(NDArrayPool::new(1_000_000));
1435
1436        // Create downstream receiver
1437        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1438        let mut output = NDArrayOutput::new();
1439        output.add(downstream_sender);
1440
1441        let (handle, _data_jh) = create_plugin_runtime_with_output(
1442            "PASS1",
1443            PassthroughProcessor,
1444            pool,
1445            10,
1446            output,
1447            "",
1448            test_wiring(),
1449        );
1450        enable_callbacks(&handle);
1451
1452        // Send an array
1453        handle.array_sender().send(make_test_array(42));
1454
1455        // Should come out the other side
1456        let received = downstream_rx.blocking_recv().unwrap();
1457        assert_eq!(received.unique_id, 42);
1458    }
1459
1460    #[test]
1461    fn test_sink_runtime() {
1462        let pool = Arc::new(NDArrayPool::new(1_000_000));
1463
1464        let (handle, _data_jh) = create_plugin_runtime(
1465            "SINK1",
1466            SinkProcessor { count: 0 },
1467            pool,
1468            10,
1469            "",
1470            test_wiring(),
1471        );
1472        enable_callbacks(&handle);
1473
1474        // Send arrays - they should be consumed silently
1475        handle.array_sender().send(make_test_array(1));
1476        handle.array_sender().send(make_test_array(2));
1477
1478        // Give processing thread time
1479        std::thread::sleep(std::time::Duration::from_millis(50));
1480
1481        // No crash, no output needed
1482        assert_eq!(handle.port_name(), "SINK1");
1483    }
1484
1485    #[test]
1486    fn test_plugin_type_param() {
1487        let pool = Arc::new(NDArrayPool::new(1_000_000));
1488
1489        let (handle, _data_jh) = create_plugin_runtime(
1490            "TYPE_TEST",
1491            PassthroughProcessor,
1492            pool,
1493            10,
1494            "",
1495            test_wiring(),
1496        );
1497
1498        // Verify port name
1499        assert_eq!(handle.port_name(), "TYPE_TEST");
1500        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1501    }
1502
1503    #[test]
1504    fn test_shutdown_on_handle_drop() {
1505        let pool = Arc::new(NDArrayPool::new(1_000_000));
1506
1507        let (handle, data_jh) = create_plugin_runtime(
1508            "SHUTDOWN_TEST",
1509            PassthroughProcessor,
1510            pool,
1511            10,
1512            "",
1513            test_wiring(),
1514        );
1515
1516        // Drop the handle (closes sender channel, which should cause data thread to exit)
1517        let sender = handle.array_sender().clone();
1518        drop(handle);
1519        drop(sender);
1520
1521        // Data thread should terminate
1522        let result = data_jh.join();
1523        assert!(result.is_ok());
1524    }
1525
1526    #[test]
1527    fn test_dropped_count_when_queue_full() {
1528        let pool = Arc::new(NDArrayPool::new(1_000_000));
1529
1530        // Very slow processor
1531        struct SlowProcessor;
1532        impl NDPluginProcess for SlowProcessor {
1533            fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1534                std::thread::sleep(std::time::Duration::from_millis(100));
1535                ProcessResult::empty()
1536            }
1537            fn plugin_type(&self) -> &str {
1538                "Slow"
1539            }
1540        }
1541
1542        let (handle, _data_jh) =
1543            create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1544        enable_callbacks(&handle);
1545
1546        // Fill the queue and overflow
1547        for i in 0..10 {
1548            handle.array_sender().send(make_test_array(i));
1549        }
1550
1551        // Some should have been dropped
1552        assert!(handle.array_sender().dropped_count() > 0);
1553    }
1554
1555    #[test]
1556    fn test_blocking_callbacks_basic() {
1557        let pool = Arc::new(NDArrayPool::new(1_000_000));
1558        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1559        let mut output = NDArrayOutput::new();
1560        output.add(downstream_sender);
1561
1562        let (handle, _data_jh) = create_plugin_runtime_with_output(
1563            "BLOCK_TEST",
1564            PassthroughProcessor,
1565            pool,
1566            10,
1567            output,
1568            "",
1569            test_wiring(),
1570        );
1571        enable_callbacks(&handle);
1572
1573        // Enable blocking mode
1574        handle
1575            .port_runtime()
1576            .port_handle()
1577            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1578            .unwrap();
1579        std::thread::sleep(std::time::Duration::from_millis(50));
1580
1581        // In blocking mode, send() processes inline and returns synchronously
1582        handle.array_sender().send(make_test_array(42));
1583
1584        // Array should already be in the downstream channel
1585        let received = downstream_rx.blocking_recv().unwrap();
1586        assert_eq!(received.unique_id, 42);
1587    }
1588
1589    #[test]
1590    fn test_blocking_to_nonblocking_switch() {
1591        let pool = Arc::new(NDArrayPool::new(1_000_000));
1592        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1593        let mut output = NDArrayOutput::new();
1594        output.add(downstream_sender);
1595
1596        let (handle, _data_jh) = create_plugin_runtime_with_output(
1597            "SWITCH_TEST",
1598            PassthroughProcessor,
1599            pool,
1600            10,
1601            output,
1602            "",
1603            test_wiring(),
1604        );
1605        enable_callbacks(&handle);
1606
1607        // Start in blocking mode
1608        handle
1609            .port_runtime()
1610            .port_handle()
1611            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1612            .unwrap();
1613        std::thread::sleep(std::time::Duration::from_millis(50));
1614
1615        handle.array_sender().send(make_test_array(1));
1616        let received = downstream_rx.blocking_recv().unwrap();
1617        assert_eq!(received.unique_id, 1);
1618
1619        // Switch back to non-blocking
1620        handle
1621            .port_runtime()
1622            .port_handle()
1623            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1624            .unwrap();
1625        std::thread::sleep(std::time::Duration::from_millis(50));
1626
1627        // Send in non-blocking mode — goes through channel to data thread
1628        handle.array_sender().send(make_test_array(2));
1629        let received = downstream_rx.blocking_recv().unwrap();
1630        assert_eq!(received.unique_id, 2);
1631    }
1632
1633    #[test]
1634    fn test_enable_callbacks_disables_processing() {
1635        let pool = Arc::new(NDArrayPool::new(1_000_000));
1636        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1637        let mut output = NDArrayOutput::new();
1638        output.add(downstream_sender);
1639
1640        let (handle, _data_jh) = create_plugin_runtime_with_output(
1641            "ENABLE_TEST",
1642            PassthroughProcessor,
1643            pool,
1644            10,
1645            output,
1646            "",
1647            test_wiring(),
1648        );
1649
1650        // Disable callbacks
1651        handle
1652            .port_runtime()
1653            .port_handle()
1654            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1655            .unwrap();
1656        std::thread::sleep(std::time::Duration::from_millis(50));
1657
1658        // Send array — should be silently dropped by sender
1659        handle.array_sender().send(make_test_array(99));
1660
1661        // Verify nothing received (with timeout)
1662        let rt = tokio::runtime::Builder::new_current_thread()
1663            .enable_all()
1664            .build()
1665            .unwrap();
1666        let result = rt.block_on(async {
1667            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1668        });
1669        assert!(
1670            result.is_err(),
1671            "should not receive array when callbacks disabled"
1672        );
1673    }
1674
1675    #[test]
1676    fn test_blocking_downstream_receives() {
1677        let pool = Arc::new(NDArrayPool::new(1_000_000));
1678
1679        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1680        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1681        let mut output = NDArrayOutput::new();
1682        output.add(ds1);
1683        output.add(ds2);
1684
1685        let (handle, _data_jh) = create_plugin_runtime_with_output(
1686            "BLOCK_DS_TEST",
1687            PassthroughProcessor,
1688            pool,
1689            10,
1690            output,
1691            "",
1692            test_wiring(),
1693        );
1694        enable_callbacks(&handle);
1695
1696        // Enable blocking mode
1697        handle
1698            .port_runtime()
1699            .port_handle()
1700            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1701            .unwrap();
1702        std::thread::sleep(std::time::Duration::from_millis(50));
1703
1704        handle.array_sender().send(make_test_array(77));
1705
1706        // Both downstream receivers should have the array
1707        let r1 = rx1.blocking_recv().unwrap();
1708        let r2 = rx2.blocking_recv().unwrap();
1709        assert_eq!(r1.unique_id, 77);
1710        assert_eq!(r2.unique_id, 77);
1711    }
1712
1713    #[test]
1714    fn test_blocking_param_updates() {
1715        let pool = Arc::new(NDArrayPool::new(1_000_000));
1716
1717        struct ParamTracker;
1718        impl NDPluginProcess for ParamTracker {
1719            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1720                ProcessResult::arrays(vec![Arc::new(array.clone())])
1721            }
1722            fn plugin_type(&self) -> &str {
1723                "ParamTracker"
1724            }
1725        }
1726
1727        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1728        let mut output = NDArrayOutput::new();
1729        output.add(downstream_sender);
1730
1731        let (handle, _data_jh) = create_plugin_runtime_with_output(
1732            "PARAM_TEST",
1733            ParamTracker,
1734            pool,
1735            10,
1736            output,
1737            "",
1738            test_wiring(),
1739        );
1740        enable_callbacks(&handle);
1741
1742        // Enable blocking mode
1743        handle
1744            .port_runtime()
1745            .port_handle()
1746            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1747            .unwrap();
1748        std::thread::sleep(std::time::Duration::from_millis(50));
1749
1750        // Send array in blocking mode
1751        handle.array_sender().send(make_test_array(1));
1752        let received = downstream_rx.blocking_recv().unwrap();
1753        assert_eq!(received.unique_id, 1);
1754
1755        // Write enable_callbacks while in blocking mode — should not crash
1756        handle
1757            .port_runtime()
1758            .port_handle()
1759            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1760            .unwrap();
1761        std::thread::sleep(std::time::Duration::from_millis(50));
1762
1763        // Still works after param update
1764        handle.array_sender().send(make_test_array(2));
1765        let received = downstream_rx.blocking_recv().unwrap();
1766        assert_eq!(received.unique_id, 2);
1767    }
1768
1769    /// Phase 0 regression test: process_and_publish inside a current-thread runtime must not panic.
1770    #[test]
1771    fn test_no_panic_in_current_thread_runtime() {
1772        let pool = Arc::new(NDArrayPool::new(1_000_000));
1773        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1774        let mut output = NDArrayOutput::new();
1775        output.add(downstream_sender);
1776
1777        let (handle, _data_jh) = create_plugin_runtime_with_output(
1778            "CURRENT_THREAD_TEST",
1779            PassthroughProcessor,
1780            pool,
1781            10,
1782            output,
1783            "",
1784            test_wiring(),
1785        );
1786        enable_callbacks(&handle);
1787
1788        // Enable blocking mode so process_and_publish runs inline
1789        handle
1790            .port_runtime()
1791            .port_handle()
1792            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1793            .unwrap();
1794        std::thread::sleep(std::time::Duration::from_millis(50));
1795
1796        // Call send (which calls process_and_publish inline) from inside a current-thread runtime
1797        let rt = tokio::runtime::Builder::new_current_thread()
1798            .enable_all()
1799            .build()
1800            .unwrap();
1801        rt.block_on(async {
1802            handle.array_sender().send(make_test_array(99));
1803        });
1804
1805        let received = downstream_rx.blocking_recv().unwrap();
1806        assert_eq!(received.unique_id, 99);
1807    }
1808
1809    #[test]
1810    fn test_sort_buffer_reorders_by_unique_id() {
1811        let mut buf = SortBuffer::new();
1812
1813        // Insert out of order: 3, 1, 2
1814        buf.insert(3, vec![make_test_array(3)], 10);
1815        buf.insert(1, vec![make_test_array(1)], 10);
1816        buf.insert(2, vec![make_test_array(2)], 10);
1817
1818        assert_eq!(buf.len(), 3);
1819
1820        let drained = buf.drain_all();
1821        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1822        assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1823        assert_eq!(buf.len(), 0);
1824        assert_eq!(buf.last_emitted_id, 3);
1825    }
1826
1827    #[test]
1828    fn test_sort_buffer_detects_disordered() {
1829        let mut buf = SortBuffer::new();
1830
1831        // Emit id=5, then insert id=3 (which is less than last_emitted_id)
1832        buf.insert(5, vec![make_test_array(5)], 10);
1833        buf.drain_all(); // emits id=5, last_emitted_id=5
1834
1835        buf.insert(3, vec![make_test_array(3)], 10);
1836        assert_eq!(buf.disordered_arrays, 1);
1837    }
1838
1839    #[test]
1840    fn test_sort_buffer_drops_when_full() {
1841        let mut buf = SortBuffer::new();
1842
1843        // sort_size=2, insert 3 entries
1844        buf.insert(1, vec![make_test_array(1)], 2);
1845        buf.insert(2, vec![make_test_array(2)], 2);
1846        buf.insert(3, vec![make_test_array(3)], 2);
1847
1848        // Buffer should have 2 entries (oldest dropped)
1849        assert_eq!(buf.len(), 2);
1850        assert_eq!(buf.dropped_output_arrays, 1);
1851
1852        let drained = buf.drain_all();
1853        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1854        assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1855    }
1856
1857    #[test]
1858    fn test_sort_mode_runtime_integration() {
1859        let pool = Arc::new(NDArrayPool::new(1_000_000));
1860        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1861        let mut output = NDArrayOutput::new();
1862        output.add(downstream_sender);
1863
1864        let (handle, _data_jh) = create_plugin_runtime_with_output(
1865            "SORT_TEST",
1866            PassthroughProcessor,
1867            pool,
1868            10,
1869            output,
1870            "",
1871            test_wiring(),
1872        );
1873        enable_callbacks(&handle);
1874
1875        // Enable sort mode with sort_size=10
1876        handle
1877            .port_runtime()
1878            .port_handle()
1879            .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1880            .unwrap();
1881        handle
1882            .port_runtime()
1883            .port_handle()
1884            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1885            .unwrap();
1886        std::thread::sleep(std::time::Duration::from_millis(50));
1887
1888        // Send arrays out of order
1889        handle.array_sender().send(make_test_array(3));
1890        handle.array_sender().send(make_test_array(1));
1891        handle.array_sender().send(make_test_array(2));
1892        std::thread::sleep(std::time::Duration::from_millis(100));
1893
1894        // Arrays should be buffered, not yet received
1895        let rt = tokio::runtime::Builder::new_current_thread()
1896            .enable_all()
1897            .build()
1898            .unwrap();
1899        let result = rt.block_on(async {
1900            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1901        });
1902        assert!(
1903            result.is_err(),
1904            "arrays should be buffered while sort mode is active"
1905        );
1906
1907        // Disable sort mode — should flush all buffered arrays in order
1908        handle
1909            .port_runtime()
1910            .port_handle()
1911            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1912            .unwrap();
1913        std::thread::sleep(std::time::Duration::from_millis(100));
1914
1915        // Receive all flushed arrays — they should arrive in sorted order
1916        let r1 = downstream_rx.blocking_recv().unwrap();
1917        let r2 = downstream_rx.blocking_recv().unwrap();
1918        let r3 = downstream_rx.blocking_recv().unwrap();
1919        assert_eq!(r1.unique_id, 1);
1920        assert_eq!(r2.unique_id, 2);
1921        assert_eq!(r3.unique_id, 3);
1922    }
1923}