Skip to main content

ad_core_rs/plugin/
runtime.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
19use super::params::PluginBaseParams;
20use super::wiring::WiringRegistry;
21
22/// Value sent through the param change channel from control plane to data plane.
23#[derive(Debug, Clone)]
24pub enum ParamChangeValue {
25    Int32(i32),
26    Float64(f64),
27    Octet(String),
28}
29
30impl ParamChangeValue {
31    pub fn as_i32(&self) -> i32 {
32        match self {
33            ParamChangeValue::Int32(v) => *v,
34            ParamChangeValue::Float64(v) => *v as i32,
35            ParamChangeValue::Octet(_) => 0,
36        }
37    }
38
39    pub fn as_f64(&self) -> f64 {
40        match self {
41            ParamChangeValue::Int32(v) => *v as f64,
42            ParamChangeValue::Float64(v) => *v,
43            ParamChangeValue::Octet(_) => 0.0,
44        }
45    }
46
47    pub fn as_string(&self) -> Option<&str> {
48        match self {
49            ParamChangeValue::Octet(s) => Some(s),
50            _ => None,
51        }
52    }
53}
54
55/// A single parameter update produced by a plugin's process_array.
56pub enum ParamUpdate {
57    Int32 {
58        reason: usize,
59        addr: i32,
60        value: i32,
61    },
62    Float64 {
63        reason: usize,
64        addr: i32,
65        value: f64,
66    },
67    Octet {
68        reason: usize,
69        addr: i32,
70        value: String,
71    },
72    Float64Array {
73        reason: usize,
74        addr: i32,
75        value: Vec<f64>,
76    },
77}
78
79impl ParamUpdate {
80    /// Create an Int32 update at addr 0.
81    pub fn int32(reason: usize, value: i32) -> Self {
82        Self::Int32 {
83            reason,
84            addr: 0,
85            value,
86        }
87    }
88    /// Create a Float64 update at addr 0.
89    pub fn float64(reason: usize, value: f64) -> Self {
90        Self::Float64 {
91            reason,
92            addr: 0,
93            value,
94        }
95    }
96    /// Create an Int32 update at a specific addr.
97    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
98        Self::Int32 {
99            reason,
100            addr,
101            value,
102        }
103    }
104    /// Create a Float64 update at a specific addr.
105    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
106        Self::Float64 {
107            reason,
108            addr,
109            value,
110        }
111    }
112    /// Create a Float64Array update at addr 0.
113    pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
114        Self::Float64Array {
115            reason,
116            addr: 0,
117            value,
118        }
119    }
120    /// Create a Float64Array update at a specific addr.
121    pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
122        Self::Float64Array {
123            reason,
124            addr,
125            value,
126        }
127    }
128}
129
130/// Result of processing one array: output arrays + param updates to write back.
131pub struct ProcessResult {
132    pub output_arrays: Vec<Arc<NDArray>>,
133    pub param_updates: Vec<ParamUpdate>,
134    /// If set, only publish to the subscriber at this index (round-robin scatter).
135    pub scatter_index: Option<usize>,
136}
137
138impl ProcessResult {
139    /// Convenience: sink plugin with only param updates, no output arrays.
140    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
141        Self {
142            output_arrays: vec![],
143            param_updates,
144            scatter_index: None,
145        }
146    }
147
148    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
149    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
150        Self {
151            output_arrays,
152            param_updates: vec![],
153            scatter_index: None,
154        }
155    }
156
157    /// Convenience: no outputs, no param updates.
158    pub fn empty() -> Self {
159        Self {
160            output_arrays: vec![],
161            param_updates: vec![],
162            scatter_index: None,
163        }
164    }
165
166    /// Convenience: scatter output — send to a single subscriber by index.
167    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
168        Self {
169            output_arrays,
170            param_updates: vec![],
171            scatter_index: Some(index),
172        }
173    }
174}
175
176/// Result of handling a control-plane param change.
177pub struct ParamChangeResult {
178    pub output_arrays: Vec<Arc<NDArray>>,
179    pub param_updates: Vec<ParamUpdate>,
180}
181
182impl ParamChangeResult {
183    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
184        Self {
185            output_arrays: vec![],
186            param_updates,
187        }
188    }
189
190    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
191        Self {
192            output_arrays,
193            param_updates: vec![],
194        }
195    }
196
197    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
198        Self {
199            output_arrays,
200            param_updates,
201        }
202    }
203
204    pub fn empty() -> Self {
205        Self {
206            output_arrays: vec![],
207            param_updates: vec![],
208        }
209    }
210}
211
212/// Pure processing logic. No threading concerns.
213pub trait NDPluginProcess: Send + 'static {
214    /// Process one array. Return output arrays and param updates.
215    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
216
217    /// Plugin type name for PLUGIN_TYPE param.
218    fn plugin_type(&self) -> &str;
219
220    /// Register plugin-specific params on the base. Called once during construction.
221    fn register_params(
222        &mut self,
223        _base: &mut PortDriverBase,
224    ) -> Result<(), asyn_rs::error::AsynError> {
225        Ok(())
226    }
227
228    /// Called when a param changes. Reason is the param index.
229    /// Return param updates to be written back to the port driver.
230    fn on_param_change(
231        &mut self,
232        _reason: usize,
233        _params: &PluginParamSnapshot,
234    ) -> ParamChangeResult {
235        ParamChangeResult::empty()
236    }
237
238    /// Return a handle to the latest NDArray data for array reads.
239    /// Override this in plugins like NDPluginStdArrays that serve pixel data
240    /// via readInt8Array/readInt16Array/etc.
241    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
242        None
243    }
244}
245
246/// Read-only snapshot of param values available to the processing thread.
247pub struct PluginParamSnapshot {
248    pub enable_callbacks: bool,
249    /// The param reason that changed.
250    pub reason: usize,
251    /// The address (sub-device) that changed.
252    pub addr: i32,
253    /// The new value.
254    pub value: ParamChangeValue,
255}
256
257/// Sort buffer for reordering output arrays by uniqueId.
258///
259/// When sort_mode is enabled, output arrays are inserted into a BTreeMap
260/// keyed by uniqueId instead of being sent directly. A periodic flush task
261/// drains arrays in uniqueId order.
262struct SortBuffer {
263    /// Buffered arrays keyed by uniqueId, ordered by BTreeMap.
264    entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
265    /// The last uniqueId that was emitted (for detecting disordered arrays).
266    last_emitted_id: i32,
267    /// Counter of arrays received out of order (uniqueId < last_emitted_id).
268    disordered_arrays: i32,
269    /// Counter of arrays dropped because the buffer was full.
270    dropped_output_arrays: i32,
271}
272
273impl SortBuffer {
274    fn new() -> Self {
275        Self {
276            entries: BTreeMap::new(),
277            last_emitted_id: 0,
278            disordered_arrays: 0,
279            dropped_output_arrays: 0,
280        }
281    }
282
283    /// Insert arrays into the sort buffer. If buffer exceeds sort_size, drop oldest entries.
284    fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
285        if unique_id < self.last_emitted_id {
286            self.disordered_arrays += 1;
287        }
288        self.entries.entry(unique_id).or_default().extend(arrays);
289
290        // Enforce sort_size limit by dropping oldest entries
291        while sort_size > 0 && self.entries.len() as i32 > sort_size {
292            if let Some((&oldest_key, _)) = self.entries.iter().next() {
293                self.entries.remove(&oldest_key);
294                self.dropped_output_arrays += 1;
295            }
296        }
297    }
298
299    /// Drain all buffered arrays in uniqueId order. Returns them as (uniqueId, arrays) pairs.
300    fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
301        let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
302        if let Some(&(last_id, _)) = entries.last() {
303            self.last_emitted_id = last_id;
304        }
305        entries
306    }
307
308    /// Number of uniqueId entries currently buffered.
309    fn len(&self) -> i32 {
310        self.entries.len() as i32
311    }
312}
313
314/// Shared processor state protected by a mutex, accessible from both
315/// the data thread (non-blocking mode) and the caller thread (blocking mode).
316struct SharedProcessorInner<P: NDPluginProcess> {
317    processor: P,
318    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
319    pool: Arc<NDArrayPool>,
320    ndarray_params: NDArrayDriverParams,
321    plugin_params: PluginBaseParams,
322    port_handle: PortHandle,
323    array_counter: i32,
324    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
325    std_array_data_param: Option<usize>,
326    /// MinCallbackTime throttling: minimum seconds between process calls.
327    min_callback_time: f64,
328    /// Last time process_and_publish was called (for throttling).
329    last_process_time: Option<std::time::Instant>,
330    /// Sort mode: 0 = disabled, 1 = sorted output.
331    sort_mode: i32,
332    /// Sort time: seconds between periodic flushes of the sort buffer.
333    sort_time: f64,
334    /// Sort size: maximum number of uniqueId entries in the sort buffer.
335    sort_size: i32,
336    /// Sort buffer for reordering output arrays by uniqueId.
337    sort_buffer: SortBuffer,
338}
339
340impl<P: NDPluginProcess> SharedProcessorInner<P> {
341    fn should_throttle(&self) -> bool {
342        if self.min_callback_time <= 0.0 {
343            return false;
344        }
345        if let Some(last) = self.last_process_time {
346            last.elapsed().as_secs_f64() < self.min_callback_time
347        } else {
348            false
349        }
350    }
351
352    /// Process array and return a `ProcessOutput`. Does NOT send to actor.
353    /// Direct interrupts (std_array_data_param) happen here (sync).
354    /// The returned output must be published and flushed by the caller in async context.
355    fn process_and_publish(&mut self, array: &NDArray) -> Option<ProcessOutput> {
356        if self.should_throttle() {
357            return None;
358        }
359        let t0 = std::time::Instant::now();
360        let result = self.processor.process_array(array, &self.pool);
361        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
362        self.last_process_time = Some(t0);
363
364        if self.sort_mode != 0 && !result.output_arrays.is_empty() {
365            let unique_id = array.unique_id;
366            self.sort_buffer
367                .insert(unique_id, result.output_arrays, self.sort_size);
368            let mut batch = self.build_sort_params_batch();
369            if !result.param_updates.is_empty() {
370                let frame_output = self.build_publish_batch(
371                    vec![],
372                    result.param_updates,
373                    result.scatter_index,
374                    Some(array),
375                    elapsed_ms,
376                );
377                batch.merge(frame_output.batch);
378            }
379            Some(ProcessOutput {
380                arrays: vec![],
381                scatter_index: None,
382                batch,
383            })
384        } else {
385            Some(self.build_publish_batch(
386                result.output_arrays,
387                result.param_updates,
388                result.scatter_index,
389                Some(array),
390                elapsed_ms,
391            ))
392        }
393    }
394
395    /// Flush the sort buffer: drain all arrays in uniqueId order.
396    fn flush_sort_buffer(&mut self) -> ProcessOutput {
397        let entries = self.sort_buffer.drain_all();
398        let mut all_arrays = Vec::new();
399        let mut combined = ParamBatch::empty();
400        for (_unique_id, arrays) in entries {
401            let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
402            all_arrays.extend(output.arrays);
403            combined.merge(output.batch);
404        }
405        combined.merge(self.build_sort_params_batch());
406        ProcessOutput {
407            arrays: all_arrays,
408            scatter_index: None,
409            batch: combined,
410        }
411    }
412
413    fn build_sort_params_batch(&self) -> ParamBatch {
414        use asyn_rs::request::ParamSetValue;
415        let sort_free = self.sort_size - self.sort_buffer.len();
416        ParamBatch {
417            addr0: vec![
418                ParamSetValue::Int32 {
419                    reason: self.plugin_params.sort_free,
420                    addr: 0,
421                    value: sort_free,
422                },
423                ParamSetValue::Int32 {
424                    reason: self.plugin_params.disordered_arrays,
425                    addr: 0,
426                    value: self.sort_buffer.disordered_arrays,
427                },
428                ParamSetValue::Int32 {
429                    reason: self.plugin_params.dropped_output_arrays,
430                    addr: 0,
431                    value: self.sort_buffer.dropped_output_arrays,
432                },
433            ],
434            extra: std::collections::HashMap::new(),
435        }
436    }
437
438    /// Build a ProcessOutput: fires direct interrupts (sync) and collects
439    /// param updates into a batch. Does NOT publish arrays — the caller
440    /// must publish them in async context.
441    fn build_publish_batch(
442        &mut self,
443        output_arrays: Vec<Arc<NDArray>>,
444        param_updates: Vec<ParamUpdate>,
445        scatter_index: Option<usize>,
446        fallback_array: Option<&NDArray>,
447        elapsed_ms: f64,
448    ) -> ProcessOutput {
449        use asyn_rs::request::ParamSetValue;
450
451        let mut addr0: Vec<ParamSetValue> = Vec::new();
452        let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
453            std::collections::HashMap::new();
454
455        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
456            self.array_counter += 1;
457
458            // Fire array data interrupt directly (C EPICS pattern).
459            if let Some(param) = self.std_array_data_param {
460                use crate::ndarray::NDDataBuffer;
461                use asyn_rs::param::ParamValue;
462                let value = match &report_arr.data {
463                    NDDataBuffer::I8(v) => {
464                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
465                    }
466                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
467                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
468                    ))),
469                    NDDataBuffer::I16(v) => {
470                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
471                    }
472                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
473                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
474                    ))),
475                    NDDataBuffer::I32(v) => {
476                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
477                    }
478                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
479                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
480                    ))),
481                    NDDataBuffer::I64(v) => {
482                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
483                    }
484                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
485                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
486                    ))),
487                    NDDataBuffer::F32(v) => {
488                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
489                    }
490                    NDDataBuffer::F64(v) => {
491                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
492                    }
493                };
494                if let Some(value) = value {
495                    let ts = report_arr.timestamp.to_system_time();
496                    self.port_handle
497                        .interrupts()
498                        .notify(asyn_rs::interrupt::InterruptValue {
499                            reason: param,
500                            addr: 0,
501                            value,
502                            timestamp: ts,
503                            uint32_changed_mask: 0,
504                            ..Default::default()
505                        });
506                }
507            }
508
509            let info = report_arr.info();
510            let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
511            addr0.extend([
512                ParamSetValue::Int32 {
513                    reason: self.ndarray_params.array_counter,
514                    addr: 0,
515                    value: self.array_counter,
516                },
517                ParamSetValue::Int32 {
518                    reason: self.ndarray_params.unique_id,
519                    addr: 0,
520                    value: report_arr.unique_id,
521                },
522                ParamSetValue::Int32 {
523                    reason: self.ndarray_params.n_dimensions,
524                    addr: 0,
525                    value: report_arr.dims.len() as i32,
526                },
527                ParamSetValue::Int32 {
528                    reason: self.ndarray_params.array_size_x,
529                    addr: 0,
530                    value: info.x_size as i32,
531                },
532                ParamSetValue::Int32 {
533                    reason: self.ndarray_params.array_size_y,
534                    addr: 0,
535                    value: info.y_size as i32,
536                },
537                ParamSetValue::Int32 {
538                    reason: self.ndarray_params.array_size_z,
539                    addr: 0,
540                    value: info.color_size as i32,
541                },
542                ParamSetValue::Int32 {
543                    reason: self.ndarray_params.array_size,
544                    addr: 0,
545                    value: info.total_bytes as i32,
546                },
547                ParamSetValue::Int32 {
548                    reason: self.ndarray_params.data_type,
549                    addr: 0,
550                    value: report_arr.data.data_type() as i32,
551                },
552                ParamSetValue::Int32 {
553                    reason: self.ndarray_params.color_mode,
554                    addr: 0,
555                    value: color_mode,
556                },
557                ParamSetValue::Float64 {
558                    reason: self.ndarray_params.timestamp_rbv,
559                    addr: 0,
560                    value: report_arr.timestamp.as_f64(),
561                },
562                ParamSetValue::Int32 {
563                    reason: self.ndarray_params.epics_ts_sec,
564                    addr: 0,
565                    value: report_arr.timestamp.sec as i32,
566                },
567                ParamSetValue::Int32 {
568                    reason: self.ndarray_params.epics_ts_nsec,
569                    addr: 0,
570                    value: report_arr.timestamp.nsec as i32,
571                },
572            ]);
573        }
574
575        addr0.push(ParamSetValue::Float64 {
576            reason: self.plugin_params.execution_time,
577            addr: 0,
578            value: elapsed_ms,
579        });
580
581        // ArrayRate_RBV is computed by a calc record in the DB template
582        // (SCAN "1 second", reading ArrayCounter_RBV delta), not in Rust.
583
584        // Plugin-specific param updates.
585        for update in &param_updates {
586            match update {
587                ParamUpdate::Int32 {
588                    reason,
589                    addr,
590                    value,
591                } => {
592                    let pv = ParamSetValue::Int32 {
593                        reason: *reason,
594                        addr: *addr,
595                        value: *value,
596                    };
597                    if *addr == 0 {
598                        addr0.push(pv);
599                    } else {
600                        extra.entry(*addr).or_default().push(pv);
601                    }
602                }
603                ParamUpdate::Float64 {
604                    reason,
605                    addr,
606                    value,
607                } => {
608                    let pv = ParamSetValue::Float64 {
609                        reason: *reason,
610                        addr: *addr,
611                        value: *value,
612                    };
613                    if *addr == 0 {
614                        addr0.push(pv);
615                    } else {
616                        extra.entry(*addr).or_default().push(pv);
617                    }
618                }
619                ParamUpdate::Octet {
620                    reason,
621                    addr,
622                    value,
623                } => {
624                    let pv = ParamSetValue::Octet {
625                        reason: *reason,
626                        addr: *addr,
627                        value: value.clone(),
628                    };
629                    if *addr == 0 {
630                        addr0.push(pv);
631                    } else {
632                        extra.entry(*addr).or_default().push(pv);
633                    }
634                }
635                ParamUpdate::Float64Array {
636                    reason,
637                    addr,
638                    value,
639                } => {
640                    let pv = ParamSetValue::Float64Array {
641                        reason: *reason,
642                        addr: *addr,
643                        value: value.clone(),
644                    };
645                    if *addr == 0 {
646                        addr0.push(pv);
647                    } else {
648                        extra.entry(*addr).or_default().push(pv);
649                    }
650                }
651            }
652        }
653
654        ProcessOutput {
655            arrays: output_arrays,
656            scatter_index,
657            batch: ParamBatch { addr0, extra },
658        }
659    }
660}
661
662/// Output from processing: arrays to publish + param batch to flush.
663struct ProcessOutput {
664    arrays: Vec<Arc<NDArray>>,
665    scatter_index: Option<usize>,
666    batch: ParamBatch,
667}
668
669impl ProcessOutput {
670    /// Publish arrays to downstream senders (async, concurrent fan-out).
671    ///
672    /// Each array is published to all senders concurrently (independent
673    /// backpressure per sender). Arrays are published in order — the next
674    /// array's fan-out starts only after the previous one completes.
675    async fn publish_arrays(&self, senders: &[NDArraySender]) {
676        for arr in &self.arrays {
677            if let Some(idx) = self.scatter_index {
678                if let Some(sender) = senders.get(idx % senders.len().max(1)) {
679                    sender.publish(arr.clone()).await;
680                }
681            } else {
682                let futs = senders.iter().map(|s| s.publish(arr.clone()));
683                futures_util::future::join_all(futs).await;
684            }
685        }
686    }
687}
688
689/// Collected param updates ready to be flushed to the actor.
690/// Produced by `build_publish_batch()`, consumed by async `flush()`.
691struct ParamBatch {
692    addr0: Vec<asyn_rs::request::ParamSetValue>,
693    extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
694}
695
696impl ParamBatch {
697    fn empty() -> Self {
698        Self {
699            addr0: Vec::new(),
700            extra: std::collections::HashMap::new(),
701        }
702    }
703
704    fn merge(&mut self, other: ParamBatch) {
705        self.addr0.extend(other.addr0);
706        for (addr, updates) in other.extra {
707            self.extra.entry(addr).or_default().extend(updates);
708        }
709    }
710
711    /// Flush via reliable async enqueue. Call from async context.
712    async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
713        if !self.addr0.is_empty() {
714            if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
715                eprintln!("plugin param flush error (addr 0): {e}");
716            }
717        }
718        for (addr, updates) in self.extra {
719            if let Err(e) = port.set_params_and_notify(addr, updates).await {
720                eprintln!("plugin param flush error (addr {addr}): {e}");
721            }
722        }
723    }
724}
725
726/// PortDriver implementation for a plugin's control plane.
727#[allow(dead_code)]
728pub struct PluginPortDriver {
729    base: PortDriverBase,
730    ndarray_params: NDArrayDriverParams,
731    plugin_params: PluginBaseParams,
732    param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
733    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
734    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
735    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
736    std_array_data_param: Option<usize>,
737}
738
739impl PluginPortDriver {
740    fn new<P: NDPluginProcess>(
741        port_name: &str,
742        plugin_type_name: &str,
743        queue_size: usize,
744        ndarray_port: &str,
745        max_addr: usize,
746        param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
747        processor: &mut P,
748        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
749    ) -> AsynResult<Self> {
750        let mut base = PortDriverBase::new(
751            port_name,
752            max_addr,
753            PortFlags {
754                can_block: true,
755                ..Default::default()
756            },
757        );
758
759        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
760        let plugin_params = PluginBaseParams::create(&mut base)?;
761
762        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
763        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
764        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
765        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
766        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
767        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
768        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
769        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
770        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
771        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
772        base.set_int32_param(ndarray_params.capture, 0, 0)?;
773        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
774        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
775        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
776        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
777        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
778        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
779        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
780        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
781        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
782        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
783
784        // Set plugin identity params
785        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
786        base.set_string_param(
787            ndarray_params.ad_core_version,
788            0,
789            env!("CARGO_PKG_VERSION").into(),
790        )?;
791        base.set_string_param(
792            ndarray_params.driver_version,
793            0,
794            env!("CARGO_PKG_VERSION").into(),
795        )?;
796        if !ndarray_port.is_empty() {
797            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
798        }
799
800        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
801        let std_array_data_param = if array_data.is_some() {
802            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
803        } else {
804            None
805        };
806
807        // Let the processor register its plugin-specific params
808        processor.register_params(&mut base)?;
809
810        Ok(Self {
811            base,
812            ndarray_params,
813            plugin_params,
814            param_change_tx,
815            array_data,
816            std_array_data_param,
817        })
818    }
819}
820
821/// Copy source slice directly into destination buffer, returning elements copied.
822fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
823    let n = src.len().min(dst.len());
824    dst[..n].copy_from_slice(&src[..n]);
825    n
826}
827
828/// Convert and copy source slice into destination buffer element-by-element.
829fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
830where
831    S: CastToF64 + Copy,
832    D: CastFromF64 + Copy,
833{
834    let n = src.len().min(dst.len());
835    for i in 0..n {
836        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
837    }
838    n
839}
840
841/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
842trait CastToF64 {
843    fn cast_to_f64(self) -> f64;
844}
845
846impl CastToF64 for i8 {
847    fn cast_to_f64(self) -> f64 {
848        self as f64
849    }
850}
851impl CastToF64 for u8 {
852    fn cast_to_f64(self) -> f64 {
853        self as f64
854    }
855}
856impl CastToF64 for i16 {
857    fn cast_to_f64(self) -> f64 {
858        self as f64
859    }
860}
861impl CastToF64 for u16 {
862    fn cast_to_f64(self) -> f64 {
863        self as f64
864    }
865}
866impl CastToF64 for i32 {
867    fn cast_to_f64(self) -> f64 {
868        self as f64
869    }
870}
871impl CastToF64 for u32 {
872    fn cast_to_f64(self) -> f64 {
873        self as f64
874    }
875}
876impl CastToF64 for i64 {
877    fn cast_to_f64(self) -> f64 {
878        self as f64
879    }
880}
881impl CastToF64 for u64 {
882    fn cast_to_f64(self) -> f64 {
883        self as f64
884    }
885}
886impl CastToF64 for f32 {
887    fn cast_to_f64(self) -> f64 {
888        self as f64
889    }
890}
891impl CastToF64 for f64 {
892    fn cast_to_f64(self) -> f64 {
893        self
894    }
895}
896
897/// Helper trait for `as` casts from f64.
898trait CastFromF64 {
899    fn cast_from_f64(v: f64) -> Self;
900}
901
902impl CastFromF64 for i8 {
903    fn cast_from_f64(v: f64) -> Self {
904        v as i8
905    }
906}
907impl CastFromF64 for i16 {
908    fn cast_from_f64(v: f64) -> Self {
909        v as i16
910    }
911}
912impl CastFromF64 for i32 {
913    fn cast_from_f64(v: f64) -> Self {
914        v as i32
915    }
916}
917impl CastFromF64 for f32 {
918    fn cast_from_f64(v: f64) -> Self {
919        v as f32
920    }
921}
922impl CastFromF64 for f64 {
923    fn cast_from_f64(v: f64) -> Self {
924        v
925    }
926}
927
928/// Copy NDArray data into the output buffer with type conversion.
929/// Returns the number of elements copied, or 0 if no data is available.
930macro_rules! impl_read_array {
931    ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
932        use crate::ndarray::NDDataBuffer;
933        let handle = match &$self.array_data {
934            Some(h) => h,
935            None => return Ok(0),
936        };
937        let guard = handle.lock();
938        let array = match &*guard {
939            Some(a) => a,
940            None => return Ok(0),
941        };
942        let n = match &array.data {
943            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
944            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
945        };
946        Ok(n)
947    }};
948}
949
950impl PortDriver for PluginPortDriver {
951    fn base(&self) -> &PortDriverBase {
952        &self.base
953    }
954
955    fn base_mut(&mut self) -> &mut PortDriverBase {
956        &mut self.base
957    }
958
959    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
960        let reason = user.reason;
961        let addr = user.addr;
962        self.base.set_int32_param(reason, addr, value)?;
963        self.base.call_param_callbacks(addr)?;
964        let _ = self
965            .param_change_tx
966            .try_send((reason, addr, ParamChangeValue::Int32(value)));
967        Ok(())
968    }
969
970    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
971        let reason = user.reason;
972        let addr = user.addr;
973        self.base.set_float64_param(reason, addr, value)?;
974        self.base.call_param_callbacks(addr)?;
975        let _ = self
976            .param_change_tx
977            .try_send((reason, addr, ParamChangeValue::Float64(value)));
978        Ok(())
979    }
980
981    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
982        let reason = user.reason;
983        let addr = user.addr;
984        let s = String::from_utf8_lossy(data).into_owned();
985        self.base.set_string_param(reason, addr, s.clone())?;
986        self.base.call_param_callbacks(addr)?;
987        let _ = self
988            .param_change_tx
989            .try_send((reason, addr, ParamChangeValue::Octet(s)));
990        Ok(())
991    }
992
993    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
994        impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
995    }
996
997    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
998        impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
999    }
1000
1001    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
1002        impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
1003    }
1004
1005    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
1006        impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
1007    }
1008
1009    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
1010        impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
1011    }
1012}
1013
1014/// Handle to a running plugin runtime. Provides access to sender and port handle.
1015#[derive(Clone)]
1016pub struct PluginRuntimeHandle {
1017    port_runtime: PortRuntimeHandle,
1018    array_sender: NDArraySender,
1019    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
1020    port_name: String,
1021    pub ndarray_params: NDArrayDriverParams,
1022    pub plugin_params: PluginBaseParams,
1023}
1024
1025impl PluginRuntimeHandle {
1026    pub fn port_runtime(&self) -> &PortRuntimeHandle {
1027        &self.port_runtime
1028    }
1029
1030    pub fn array_sender(&self) -> &NDArraySender {
1031        &self.array_sender
1032    }
1033
1034    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
1035        &self.array_output
1036    }
1037
1038    pub fn port_name(&self) -> &str {
1039        &self.port_name
1040    }
1041}
1042
1043/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
1044///
1045/// Returns:
1046/// - `PluginRuntimeHandle` for wiring and control
1047/// - `PortRuntimeHandle` for param I/O
1048/// - `JoinHandle` for the data processing thread
1049pub fn create_plugin_runtime<P: NDPluginProcess>(
1050    port_name: &str,
1051    processor: P,
1052    pool: Arc<NDArrayPool>,
1053    queue_size: usize,
1054    ndarray_port: &str,
1055    wiring: Arc<WiringRegistry>,
1056) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1057    create_plugin_runtime_multi_addr(
1058        port_name,
1059        processor,
1060        pool,
1061        queue_size,
1062        ndarray_port,
1063        wiring,
1064        1,
1065    )
1066}
1067
1068/// Create a plugin runtime with multi-addr support.
1069///
1070/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
1071pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1072    port_name: &str,
1073    mut processor: P,
1074    pool: Arc<NDArrayPool>,
1075    queue_size: usize,
1076    ndarray_port: &str,
1077    wiring: Arc<WiringRegistry>,
1078    max_addr: usize,
1079) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1080    // Param change channel (control plane -> data plane)
1081    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1082
1083    // Capture plugin type and array data handle before mutable borrow
1084    let plugin_type_name = processor.plugin_type().to_string();
1085    let array_data = processor.array_data_handle();
1086
1087    // Create the port driver for control plane
1088    let driver = PluginPortDriver::new(
1089        port_name,
1090        &plugin_type_name,
1091        queue_size,
1092        ndarray_port,
1093        max_addr,
1094        param_tx,
1095        &mut processor,
1096        array_data,
1097    )
1098    .expect("failed to create plugin port driver");
1099
1100    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1101    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1102    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1103    let sort_mode_reason = driver.plugin_params.sort_mode;
1104    let sort_time_reason = driver.plugin_params.sort_time;
1105    let sort_size_reason = driver.plugin_params.sort_size;
1106    let ndarray_params = driver.ndarray_params;
1107    let plugin_params = driver.plugin_params;
1108    let std_array_data_param = driver.std_array_data_param;
1109
1110    // Create port runtime (actor thread for param I/O)
1111    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1112
1113    // Clone port handle for the data thread to write params back
1114    let port_handle = port_runtime.port_handle().clone();
1115
1116    // Array channel (data plane)
1117    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1118
1119    // Shared mode flags
1120    let enabled = Arc::new(AtomicBool::new(false));
1121    let blocking_mode = Arc::new(AtomicBool::new(false));
1122
1123    // Shared processor (accessible from data thread)
1124    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1125    let array_output_for_handle = array_output.clone();
1126    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1127        processor,
1128        output: array_output,
1129        pool,
1130        ndarray_params,
1131        plugin_params,
1132        port_handle,
1133        array_counter: 0,
1134        std_array_data_param,
1135        min_callback_time: 0.0,
1136        last_process_time: None,
1137        sort_mode: 0,
1138        sort_time: 0.0,
1139        sort_size: 10,
1140        sort_buffer: SortBuffer::new(),
1141    }));
1142
1143    let data_enabled = enabled.clone();
1144    let data_blocking = blocking_mode.clone();
1145
1146    let mut array_sender = array_sender;
1147    array_sender.set_mode_flags(enabled, blocking_mode);
1148
1149    // Capture wiring info for data loop
1150    let nd_array_port_reason = plugin_params.nd_array_port;
1151    let sender_port_name = port_name.to_string();
1152    let initial_upstream = ndarray_port.to_string();
1153
1154    // Spawn data processing thread
1155    let data_jh = thread::Builder::new()
1156        .name(format!("plugin-data-{port_name}"))
1157        .spawn(move || {
1158            plugin_data_loop(
1159                shared,
1160                array_rx,
1161                param_rx,
1162                enable_callbacks_reason,
1163                blocking_callbacks_reason,
1164                min_callback_time_reason,
1165                sort_mode_reason,
1166                sort_time_reason,
1167                sort_size_reason,
1168                data_enabled,
1169                data_blocking,
1170                nd_array_port_reason,
1171                sender_port_name,
1172                initial_upstream,
1173                wiring,
1174            );
1175        })
1176        .expect("failed to spawn plugin data thread");
1177
1178    let handle = PluginRuntimeHandle {
1179        port_runtime,
1180        array_sender,
1181        array_output: array_output_for_handle,
1182        port_name: port_name.to_string(),
1183        ndarray_params,
1184        plugin_params,
1185    };
1186
1187    (handle, data_jh)
1188}
1189
1190fn plugin_data_loop<P: NDPluginProcess>(
1191    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1192    mut array_rx: NDArrayReceiver,
1193    mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1194    enable_callbacks_reason: usize,
1195    blocking_callbacks_reason: usize,
1196    min_callback_time_reason: usize,
1197    sort_mode_reason: usize,
1198    sort_time_reason: usize,
1199    sort_size_reason: usize,
1200    enabled: Arc<AtomicBool>,
1201    blocking_mode: Arc<AtomicBool>,
1202    nd_array_port_reason: usize,
1203    sender_port_name: String,
1204    initial_upstream: String,
1205    wiring: Arc<WiringRegistry>,
1206) {
1207    let mut current_upstream = initial_upstream;
1208    let rt = tokio::runtime::Builder::new_current_thread()
1209        .enable_all()
1210        .build()
1211        .unwrap();
1212    rt.block_on(async {
1213        // Sort flush timer — starts disabled (very long interval).
1214        // Re-created when sort_time changes.
1215        let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1216        let mut sort_flush_active = false;
1217
1218        loop {
1219            tokio::select! {
1220                msg = array_rx.recv_msg() => {
1221                    match msg {
1222                        Some(msg) => {
1223                            // Process array and collect output (sync, under lock).
1224                            let (process_output, senders, port) = {
1225                                let mut guard = shared.lock();
1226                                let output = guard.process_and_publish(&msg.array);
1227                                let senders = guard.output.lock().senders_clone();
1228                                let port = guard.port_handle.clone();
1229                                (output, senders, port)
1230                            };
1231                            // msg dropped here → completion signaled (if tracked)
1232                            // Publish arrays and flush params outside the lock, in async context.
1233                            if let Some(po) = process_output {
1234                                po.publish_arrays(&senders).await;
1235                                po.batch.flush(&port).await;
1236                            }
1237                        }
1238                        None => break,
1239                    }
1240                }
1241                param = param_rx.recv() => {
1242                    match param {
1243                        Some((reason, addr, value)) => {
1244                            if reason == enable_callbacks_reason {
1245                                enabled.store(value.as_i32() != 0, Ordering::Release);
1246                            }
1247                            if reason == blocking_callbacks_reason {
1248                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1249                            }
1250                            // Handle MinCallbackTime param change
1251                            if reason == min_callback_time_reason {
1252                                shared.lock().min_callback_time = value.as_f64();
1253                            }
1254                            // Handle sort param changes
1255                            if reason == sort_mode_reason {
1256                                let mode = value.as_i32();
1257                                // Scope the guard so clippy can verify the lock
1258                                // is released before any await.
1259                                let flush_work = {
1260                                    let mut guard = shared.lock();
1261                                    guard.sort_mode = mode;
1262                                    if mode == 0 {
1263                                        let output = guard.flush_sort_buffer();
1264                                        let senders = guard.output.lock().senders_clone();
1265                                        let port = guard.port_handle.clone();
1266                                        sort_flush_active = false;
1267                                        Some((output, senders, port))
1268                                    } else {
1269                                        sort_flush_active = guard.sort_time > 0.0;
1270                                        if sort_flush_active {
1271                                            let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1272                                            sort_flush_interval = tokio::time::interval(dur);
1273                                        }
1274                                        None
1275                                    }
1276                                };
1277                                if let Some((output, senders, port)) = flush_work {
1278                                    output.publish_arrays(&senders).await;
1279                                    output.batch.flush(&port).await;
1280                                }
1281                            }
1282                            if reason == sort_time_reason {
1283                                let t = value.as_f64();
1284                                let mut guard = shared.lock();
1285                                guard.sort_time = t;
1286                                if guard.sort_mode != 0 && t > 0.0 {
1287                                    sort_flush_active = true;
1288                                    let dur = std::time::Duration::from_secs_f64(t);
1289                                    sort_flush_interval = tokio::time::interval(dur);
1290                                } else {
1291                                    sort_flush_active = false;
1292                                }
1293                                drop(guard);
1294                            }
1295                            if reason == sort_size_reason {
1296                                shared.lock().sort_size = value.as_i32();
1297                            }
1298                            // Handle NDArrayPort rewiring
1299                            if reason == nd_array_port_reason {
1300                                if let Some(new_port) = value.as_string() {
1301                                    if new_port != current_upstream {
1302                                        let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1303                                        if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1304                                            eprintln!("NDArrayPort rewire failed: {e}");
1305                                            current_upstream = old;
1306                                        }
1307                                    }
1308                                }
1309                            }
1310                            let snapshot = PluginParamSnapshot {
1311                                enable_callbacks: enabled.load(Ordering::Acquire),
1312                                reason,
1313                                addr,
1314                                value,
1315                            };
1316                            let (process_output, senders, port) = {
1317                                let mut guard = shared.lock();
1318                                let t0 = std::time::Instant::now();
1319                                let result = guard.processor.on_param_change(reason, &snapshot);
1320                                let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1321                                let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1322                                    Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
1323                                } else {
1324                                    None
1325                                };
1326                                let senders = guard.output.lock().senders_clone();
1327                                (output, senders, guard.port_handle.clone())
1328                            };
1329                            if let Some(po) = process_output {
1330                                po.publish_arrays(&senders).await;
1331                                po.batch.flush(&port).await;
1332                            }
1333                        }
1334                        None => break,
1335                    }
1336                }
1337                _ = sort_flush_interval.tick(), if sort_flush_active => {
1338                    let (output, senders, port) = {
1339                        let mut guard = shared.lock();
1340                        let output = guard.flush_sort_buffer();
1341                        let senders = guard.output.lock().senders_clone();
1342                        let port = guard.port_handle.clone();
1343                        (output, senders, port)
1344                    };
1345                    output.publish_arrays(&senders).await;
1346                    output.batch.flush(&port).await;
1347                }
1348            }
1349        }
1350    });
1351}
1352
1353/// Connect a downstream plugin's sender to a plugin runtime's output.
1354pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1355    upstream.array_output().lock().add(downstream_sender);
1356}
1357
1358/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
1359pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1360    port_name: &str,
1361    mut processor: P,
1362    pool: Arc<NDArrayPool>,
1363    queue_size: usize,
1364    output: NDArrayOutput,
1365    ndarray_port: &str,
1366    wiring: Arc<WiringRegistry>,
1367) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1368    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1369
1370    let plugin_type_name = processor.plugin_type().to_string();
1371    let array_data = processor.array_data_handle();
1372    let driver = PluginPortDriver::new(
1373        port_name,
1374        &plugin_type_name,
1375        queue_size,
1376        ndarray_port,
1377        1,
1378        param_tx,
1379        &mut processor,
1380        array_data,
1381    )
1382    .expect("failed to create plugin port driver");
1383
1384    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1385    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1386    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1387    let sort_mode_reason = driver.plugin_params.sort_mode;
1388    let sort_time_reason = driver.plugin_params.sort_time;
1389    let sort_size_reason = driver.plugin_params.sort_size;
1390    let ndarray_params = driver.ndarray_params;
1391    let plugin_params = driver.plugin_params;
1392    let std_array_data_param = driver.std_array_data_param;
1393
1394    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1395
1396    let port_handle = port_runtime.port_handle().clone();
1397
1398    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1399
1400    let enabled = Arc::new(AtomicBool::new(false));
1401    let blocking_mode = Arc::new(AtomicBool::new(false));
1402
1403    let array_output = Arc::new(parking_lot::Mutex::new(output));
1404    let array_output_for_handle = array_output.clone();
1405    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1406        processor,
1407        output: array_output,
1408        pool,
1409        ndarray_params,
1410        plugin_params,
1411        port_handle,
1412        array_counter: 0,
1413        std_array_data_param,
1414        min_callback_time: 0.0,
1415        last_process_time: None,
1416        sort_mode: 0,
1417        sort_time: 0.0,
1418        sort_size: 10,
1419        sort_buffer: SortBuffer::new(),
1420    }));
1421
1422    let data_enabled = enabled.clone();
1423    let data_blocking = blocking_mode.clone();
1424
1425    let mut array_sender = array_sender;
1426    array_sender.set_mode_flags(enabled, blocking_mode);
1427
1428    // Capture wiring info for data loop
1429    let nd_array_port_reason = plugin_params.nd_array_port;
1430    let sender_port_name = port_name.to_string();
1431    let initial_upstream = ndarray_port.to_string();
1432
1433    let data_jh = thread::Builder::new()
1434        .name(format!("plugin-data-{port_name}"))
1435        .spawn(move || {
1436            plugin_data_loop(
1437                shared,
1438                array_rx,
1439                param_rx,
1440                enable_callbacks_reason,
1441                blocking_callbacks_reason,
1442                min_callback_time_reason,
1443                sort_mode_reason,
1444                sort_time_reason,
1445                sort_size_reason,
1446                data_enabled,
1447                data_blocking,
1448                nd_array_port_reason,
1449                sender_port_name,
1450                initial_upstream,
1451                wiring,
1452            );
1453        })
1454        .expect("failed to spawn plugin data thread");
1455
1456    let handle = PluginRuntimeHandle {
1457        port_runtime,
1458        array_sender,
1459        array_output: array_output_for_handle,
1460        port_name: port_name.to_string(),
1461        ndarray_params,
1462        plugin_params,
1463    };
1464
1465    (handle, data_jh)
1466}
1467
1468#[cfg(test)]
1469mod tests {
1470    use super::*;
1471    use crate::ndarray::{NDDataType, NDDimension};
1472    use crate::plugin::channel::ndarray_channel;
1473
1474    /// Passthrough processor: returns the input array as-is.
1475    struct PassthroughProcessor;
1476
1477    impl NDPluginProcess for PassthroughProcessor {
1478        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1479            ProcessResult::arrays(vec![Arc::new(array.clone())])
1480        }
1481        fn plugin_type(&self) -> &str {
1482            "Passthrough"
1483        }
1484    }
1485
1486    /// Sink processor: consumes arrays, returns nothing.
1487    struct SinkProcessor {
1488        count: usize,
1489    }
1490
1491    impl NDPluginProcess for SinkProcessor {
1492        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1493            self.count += 1;
1494            ProcessResult::empty()
1495        }
1496        fn plugin_type(&self) -> &str {
1497            "Sink"
1498        }
1499    }
1500
1501    fn make_test_array(id: i32) -> Arc<NDArray> {
1502        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1503        arr.unique_id = id;
1504        Arc::new(arr)
1505    }
1506
1507    fn test_wiring() -> Arc<WiringRegistry> {
1508        Arc::new(WiringRegistry::new())
1509    }
1510
1511    /// Enable callbacks on a plugin handle (plugins default to disabled).
1512    fn enable_callbacks(handle: &PluginRuntimeHandle) {
1513        handle
1514            .port_runtime()
1515            .port_handle()
1516            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1517            .unwrap();
1518        std::thread::sleep(std::time::Duration::from_millis(10));
1519    }
1520
1521    /// Send an array via the sender from a sync test context.
1522    /// Uses a dedicated thread with a current-thread runtime to avoid
1523    /// interfering with the plugin's own runtime.
1524    fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
1525        let sender = sender.clone();
1526        let jh = std::thread::spawn(move || {
1527            let rt = tokio::runtime::Builder::new_current_thread()
1528                .enable_all()
1529                .build()
1530                .unwrap();
1531            rt.block_on(sender.publish(array));
1532        });
1533        jh.join().unwrap();
1534    }
1535
1536    #[test]
1537    fn test_passthrough_runtime() {
1538        let pool = Arc::new(NDArrayPool::new(1_000_000));
1539
1540        // Create downstream receiver
1541        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1542        let mut output = NDArrayOutput::new();
1543        output.add(downstream_sender);
1544
1545        let (handle, _data_jh) = create_plugin_runtime_with_output(
1546            "PASS1",
1547            PassthroughProcessor,
1548            pool,
1549            10,
1550            output,
1551            "",
1552            test_wiring(),
1553        );
1554        enable_callbacks(&handle);
1555
1556        // Send an array
1557        send_array(handle.array_sender(), make_test_array(42));
1558
1559        // Should come out the other side
1560        let received = downstream_rx.blocking_recv().unwrap();
1561        assert_eq!(received.unique_id, 42);
1562    }
1563
1564    #[test]
1565    fn test_sink_runtime() {
1566        let pool = Arc::new(NDArrayPool::new(1_000_000));
1567
1568        let (handle, _data_jh) = create_plugin_runtime(
1569            "SINK1",
1570            SinkProcessor { count: 0 },
1571            pool,
1572            10,
1573            "",
1574            test_wiring(),
1575        );
1576        enable_callbacks(&handle);
1577
1578        // Send arrays - they should be consumed silently
1579        send_array(handle.array_sender(), make_test_array(1));
1580        send_array(handle.array_sender(), make_test_array(2));
1581
1582        // Give processing thread time
1583        std::thread::sleep(std::time::Duration::from_millis(50));
1584
1585        // No crash, no output needed
1586        assert_eq!(handle.port_name(), "SINK1");
1587    }
1588
1589    #[test]
1590    fn test_plugin_type_param() {
1591        let pool = Arc::new(NDArrayPool::new(1_000_000));
1592
1593        let (handle, _data_jh) = create_plugin_runtime(
1594            "TYPE_TEST",
1595            PassthroughProcessor,
1596            pool,
1597            10,
1598            "",
1599            test_wiring(),
1600        );
1601
1602        // Verify port name
1603        assert_eq!(handle.port_name(), "TYPE_TEST");
1604        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1605    }
1606
1607    #[test]
1608    fn test_shutdown_on_handle_drop() {
1609        let pool = Arc::new(NDArrayPool::new(1_000_000));
1610
1611        let (handle, data_jh) = create_plugin_runtime(
1612            "SHUTDOWN_TEST",
1613            PassthroughProcessor,
1614            pool,
1615            10,
1616            "",
1617            test_wiring(),
1618        );
1619
1620        // Drop the handle (closes sender channel, which should cause data thread to exit)
1621        let sender = handle.array_sender().clone();
1622        drop(handle);
1623        drop(sender);
1624
1625        // Data thread should terminate
1626        let result = data_jh.join();
1627        assert!(result.is_ok());
1628    }
1629
1630    #[test]
1631    fn test_nonblocking_passthrough() {
1632        let pool = Arc::new(NDArrayPool::new(1_000_000));
1633        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1634        let mut output = NDArrayOutput::new();
1635        output.add(downstream_sender);
1636
1637        let (handle, _data_jh) = create_plugin_runtime_with_output(
1638            "NB_TEST",
1639            PassthroughProcessor,
1640            pool,
1641            10,
1642            output,
1643            "",
1644            test_wiring(),
1645        );
1646        enable_callbacks(&handle);
1647
1648        send_array(handle.array_sender(), make_test_array(42));
1649
1650        let received = downstream_rx.blocking_recv().unwrap();
1651        assert_eq!(received.unique_id, 42);
1652    }
1653
1654    #[test]
1655    fn test_blocking_to_nonblocking_switch() {
1656        let pool = Arc::new(NDArrayPool::new(1_000_000));
1657        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1658        let mut output = NDArrayOutput::new();
1659        output.add(downstream_sender);
1660
1661        let (handle, _data_jh) = create_plugin_runtime_with_output(
1662            "SWITCH_TEST",
1663            PassthroughProcessor,
1664            pool,
1665            10,
1666            output,
1667            "",
1668            test_wiring(),
1669        );
1670        enable_callbacks(&handle);
1671
1672        // Start in blocking mode
1673        handle
1674            .port_runtime()
1675            .port_handle()
1676            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1677            .unwrap();
1678        std::thread::sleep(std::time::Duration::from_millis(50));
1679
1680        send_array(handle.array_sender(), make_test_array(1));
1681        let received = downstream_rx.blocking_recv().unwrap();
1682        assert_eq!(received.unique_id, 1);
1683
1684        // Switch back to non-blocking
1685        handle
1686            .port_runtime()
1687            .port_handle()
1688            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1689            .unwrap();
1690        std::thread::sleep(std::time::Duration::from_millis(50));
1691
1692        // Send in non-blocking mode — goes through channel to data thread
1693        send_array(handle.array_sender(), make_test_array(2));
1694        let received = downstream_rx.blocking_recv().unwrap();
1695        assert_eq!(received.unique_id, 2);
1696    }
1697
1698    #[test]
1699    fn test_enable_callbacks_disables_processing() {
1700        let pool = Arc::new(NDArrayPool::new(1_000_000));
1701        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1702        let mut output = NDArrayOutput::new();
1703        output.add(downstream_sender);
1704
1705        let (handle, _data_jh) = create_plugin_runtime_with_output(
1706            "ENABLE_TEST",
1707            PassthroughProcessor,
1708            pool,
1709            10,
1710            output,
1711            "",
1712            test_wiring(),
1713        );
1714
1715        // Disable callbacks
1716        handle
1717            .port_runtime()
1718            .port_handle()
1719            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1720            .unwrap();
1721        std::thread::sleep(std::time::Duration::from_millis(50));
1722
1723        // Send array — should be silently dropped by sender (callbacks disabled)
1724        send_array(handle.array_sender(), make_test_array(99));
1725
1726        // Verify nothing received (with timeout)
1727        let rt = tokio::runtime::Builder::new_current_thread()
1728            .enable_all()
1729            .build()
1730            .unwrap();
1731        let result = rt.block_on(async {
1732            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1733        });
1734        assert!(
1735            result.is_err(),
1736            "should not receive array when callbacks disabled"
1737        );
1738    }
1739
1740    #[test]
1741    fn test_downstream_receives_multiple() {
1742        let pool = Arc::new(NDArrayPool::new(1_000_000));
1743
1744        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1745        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1746        let mut output = NDArrayOutput::new();
1747        output.add(ds1);
1748        output.add(ds2);
1749
1750        let (handle, _data_jh) = create_plugin_runtime_with_output(
1751            "DS_TEST",
1752            PassthroughProcessor,
1753            pool,
1754            10,
1755            output,
1756            "",
1757            test_wiring(),
1758        );
1759        enable_callbacks(&handle);
1760
1761        send_array(handle.array_sender(), make_test_array(77));
1762
1763        // Both downstream receivers should have the array
1764        let r1 = rx1.blocking_recv().unwrap();
1765        let r2 = rx2.blocking_recv().unwrap();
1766        assert_eq!(r1.unique_id, 77);
1767        assert_eq!(r2.unique_id, 77);
1768    }
1769
1770    #[test]
1771    fn test_param_updates_after_send() {
1772        let pool = Arc::new(NDArrayPool::new(1_000_000));
1773
1774        struct ParamTracker;
1775        impl NDPluginProcess for ParamTracker {
1776            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1777                ProcessResult::arrays(vec![Arc::new(array.clone())])
1778            }
1779            fn plugin_type(&self) -> &str {
1780                "ParamTracker"
1781            }
1782        }
1783
1784        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1785        let mut output = NDArrayOutput::new();
1786        output.add(downstream_sender);
1787
1788        let (handle, _data_jh) = create_plugin_runtime_with_output(
1789            "PARAM_TEST",
1790            ParamTracker,
1791            pool,
1792            10,
1793            output,
1794            "",
1795            test_wiring(),
1796        );
1797        enable_callbacks(&handle);
1798
1799        // Send array
1800        send_array(handle.array_sender(), make_test_array(1));
1801        let received = downstream_rx.blocking_recv().unwrap();
1802        assert_eq!(received.unique_id, 1);
1803
1804        // Write enable_callbacks — should not crash
1805        handle
1806            .port_runtime()
1807            .port_handle()
1808            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1809            .unwrap();
1810        std::thread::sleep(std::time::Duration::from_millis(50));
1811
1812        // Still works after param update
1813        send_array(handle.array_sender(), make_test_array(2));
1814        let received = downstream_rx.blocking_recv().unwrap();
1815        assert_eq!(received.unique_id, 2);
1816    }
1817
1818    #[test]
1819    fn test_sort_buffer_reorders_by_unique_id() {
1820        let mut buf = SortBuffer::new();
1821
1822        // Insert out of order: 3, 1, 2
1823        buf.insert(3, vec![make_test_array(3)], 10);
1824        buf.insert(1, vec![make_test_array(1)], 10);
1825        buf.insert(2, vec![make_test_array(2)], 10);
1826
1827        assert_eq!(buf.len(), 3);
1828
1829        let drained = buf.drain_all();
1830        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1831        assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1832        assert_eq!(buf.len(), 0);
1833        assert_eq!(buf.last_emitted_id, 3);
1834    }
1835
1836    #[test]
1837    fn test_sort_buffer_detects_disordered() {
1838        let mut buf = SortBuffer::new();
1839
1840        // Emit id=5, then insert id=3 (which is less than last_emitted_id)
1841        buf.insert(5, vec![make_test_array(5)], 10);
1842        buf.drain_all(); // emits id=5, last_emitted_id=5
1843
1844        buf.insert(3, vec![make_test_array(3)], 10);
1845        assert_eq!(buf.disordered_arrays, 1);
1846    }
1847
1848    #[test]
1849    fn test_sort_buffer_drops_when_full() {
1850        let mut buf = SortBuffer::new();
1851
1852        // sort_size=2, insert 3 entries
1853        buf.insert(1, vec![make_test_array(1)], 2);
1854        buf.insert(2, vec![make_test_array(2)], 2);
1855        buf.insert(3, vec![make_test_array(3)], 2);
1856
1857        // Buffer should have 2 entries (oldest dropped)
1858        assert_eq!(buf.len(), 2);
1859        assert_eq!(buf.dropped_output_arrays, 1);
1860
1861        let drained = buf.drain_all();
1862        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1863        assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1864    }
1865
1866    #[test]
1867    fn test_sort_mode_runtime_integration() {
1868        let pool = Arc::new(NDArrayPool::new(1_000_000));
1869        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1870        let mut output = NDArrayOutput::new();
1871        output.add(downstream_sender);
1872
1873        let (handle, _data_jh) = create_plugin_runtime_with_output(
1874            "SORT_TEST",
1875            PassthroughProcessor,
1876            pool,
1877            10,
1878            output,
1879            "",
1880            test_wiring(),
1881        );
1882        enable_callbacks(&handle);
1883
1884        // Enable sort mode with sort_size=10
1885        handle
1886            .port_runtime()
1887            .port_handle()
1888            .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1889            .unwrap();
1890        handle
1891            .port_runtime()
1892            .port_handle()
1893            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1894            .unwrap();
1895        std::thread::sleep(std::time::Duration::from_millis(50));
1896
1897        // Send arrays out of order
1898        send_array(handle.array_sender(), make_test_array(3));
1899        send_array(handle.array_sender(), make_test_array(1));
1900        send_array(handle.array_sender(), make_test_array(2));
1901        std::thread::sleep(std::time::Duration::from_millis(100));
1902
1903        // Arrays should be buffered, not yet received
1904        let rt = tokio::runtime::Builder::new_current_thread()
1905            .enable_all()
1906            .build()
1907            .unwrap();
1908        let result = rt.block_on(async {
1909            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1910        });
1911        assert!(
1912            result.is_err(),
1913            "arrays should be buffered while sort mode is active"
1914        );
1915
1916        // Disable sort mode — should flush all buffered arrays in order
1917        handle
1918            .port_runtime()
1919            .port_handle()
1920            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1921            .unwrap();
1922        std::thread::sleep(std::time::Duration::from_millis(100));
1923
1924        // Receive all flushed arrays — they should arrive in sorted order
1925        let r1 = downstream_rx.blocking_recv().unwrap();
1926        let r2 = downstream_rx.blocking_recv().unwrap();
1927        let r3 = downstream_rx.blocking_recv().unwrap();
1928        assert_eq!(r1.unique_id, 1);
1929        assert_eq!(r2.unique_id, 2);
1930        assert_eq!(r3.unique_id, 3);
1931    }
1932}