Skip to main content

ad_core_rs/plugin/
runtime.rs

1//! Plugin runtime: control plane (PortActor) + data plane (processing thread).
2//!
3//! # Single-threaded data plane (intentional, G4)
4//!
5//! C++ `NDPluginDriver` runs `numThreads` worker threads sharing one input
6//! queue (`createCallbackThreads`). The Rust port deliberately runs **exactly
7//! one** per-plugin data thread driving a `tokio::select!` loop. This is an
8//! intentional design choice: a single owner of the processing state removes
9//! the C++ worker-pool races (shared `prevUniqueId_`, sort-buffer contention)
10//! and keeps array ordering trivially correct. The `NUM_THREADS` / `MAX_THREADS`
11//! PVs are therefore not backed by a real worker pool — instead `NumThreads`
12//! is validated and clamped to `[1, MaxThreads]` on write and the clamped
13//! value is written back, so the PV is honest about the accepted value rather
14//! than silently inert.
15
16use std::collections::BTreeMap;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::thread;
20
21use asyn_rs::error::AsynResult;
22use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
23use asyn_rs::runtime::config::RuntimeConfig;
24use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
25use asyn_rs::user::AsynUser;
26
27use asyn_rs::port_handle::PortHandle;
28
29use crate::ndarray::NDArray;
30use crate::ndarray_pool::NDArrayPool;
31use crate::params::ndarray_driver::NDArrayDriverParams;
32
33use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
34use super::params::PluginBaseParams;
35use super::wiring::{WiringRegistry, upstream_key};
36
37/// Value sent through the param change channel from control plane to data plane.
38#[derive(Debug, Clone)]
39pub enum ParamChangeValue {
40    Int32(i32),
41    Float64(f64),
42    Octet(String),
43}
44
45impl ParamChangeValue {
46    pub fn as_i32(&self) -> i32 {
47        match self {
48            ParamChangeValue::Int32(v) => *v,
49            ParamChangeValue::Float64(v) => *v as i32,
50            ParamChangeValue::Octet(_) => 0,
51        }
52    }
53
54    pub fn as_f64(&self) -> f64 {
55        match self {
56            ParamChangeValue::Int32(v) => *v as f64,
57            ParamChangeValue::Float64(v) => *v,
58            ParamChangeValue::Octet(_) => 0.0,
59        }
60    }
61
62    pub fn as_string(&self) -> Option<&str> {
63        match self {
64            ParamChangeValue::Octet(s) => Some(s),
65            _ => None,
66        }
67    }
68}
69
70/// A single parameter update produced by a plugin's process_array.
71pub enum ParamUpdate {
72    Int32 {
73        reason: usize,
74        addr: i32,
75        value: i32,
76    },
77    Float64 {
78        reason: usize,
79        addr: i32,
80        value: f64,
81    },
82    Octet {
83        reason: usize,
84        addr: i32,
85        value: String,
86    },
87    Float64Array {
88        reason: usize,
89        addr: i32,
90        value: Vec<f64>,
91    },
92}
93
94impl ParamUpdate {
95    /// Create an Int32 update at addr 0.
96    pub fn int32(reason: usize, value: i32) -> Self {
97        Self::Int32 {
98            reason,
99            addr: 0,
100            value,
101        }
102    }
103    /// Create a Float64 update at addr 0.
104    pub fn float64(reason: usize, value: f64) -> Self {
105        Self::Float64 {
106            reason,
107            addr: 0,
108            value,
109        }
110    }
111    /// Create an Int32 update at a specific addr.
112    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
113        Self::Int32 {
114            reason,
115            addr,
116            value,
117        }
118    }
119    /// Create a Float64 update at a specific addr.
120    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
121        Self::Float64 {
122            reason,
123            addr,
124            value,
125        }
126    }
127    /// Create a Float64Array update at addr 0.
128    pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
129        Self::Float64Array {
130            reason,
131            addr: 0,
132            value,
133        }
134    }
135    /// Create a Float64Array update at a specific addr.
136    pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
137        Self::Float64Array {
138            reason,
139            addr,
140            value,
141        }
142    }
143    /// Create an Octet (string) update at addr 0.
144    pub fn octet(reason: usize, value: String) -> Self {
145        Self::Octet {
146            reason,
147            addr: 0,
148            value,
149        }
150    }
151    /// Create an Octet (string) update at a specific addr.
152    pub fn octet_addr(reason: usize, addr: i32, value: String) -> Self {
153        Self::Octet {
154            reason,
155            addr,
156            value,
157        }
158    }
159}
160
161/// Result of processing one array: output arrays + param updates to write back.
162pub struct ProcessResult {
163    pub output_arrays: Vec<Arc<NDArray>>,
164    pub param_updates: Vec<ParamUpdate>,
165    /// If set, only publish to the subscriber at this index (round-robin scatter).
166    pub scatter_index: Option<usize>,
167}
168
169impl ProcessResult {
170    /// Convenience: sink plugin with only param updates, no output arrays.
171    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
172        Self {
173            output_arrays: vec![],
174            param_updates,
175            scatter_index: None,
176        }
177    }
178
179    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
180    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
181        Self {
182            output_arrays,
183            param_updates: vec![],
184            scatter_index: None,
185        }
186    }
187
188    /// Convenience: no outputs, no param updates.
189    pub fn empty() -> Self {
190        Self {
191            output_arrays: vec![],
192            param_updates: vec![],
193            scatter_index: None,
194        }
195    }
196
197    /// Convenience: scatter output — send to a single subscriber by index.
198    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
199        Self {
200            output_arrays,
201            param_updates: vec![],
202            scatter_index: Some(index),
203        }
204    }
205}
206
207/// Result of handling a control-plane param change.
208pub struct ParamChangeResult {
209    pub output_arrays: Vec<Arc<NDArray>>,
210    pub param_updates: Vec<ParamUpdate>,
211}
212
213impl ParamChangeResult {
214    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
215        Self {
216            output_arrays: vec![],
217            param_updates,
218        }
219    }
220
221    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
222        Self {
223            output_arrays,
224            param_updates: vec![],
225        }
226    }
227
228    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
229        Self {
230            output_arrays,
231            param_updates,
232        }
233    }
234
235    pub fn empty() -> Self {
236        Self {
237            output_arrays: vec![],
238            param_updates: vec![],
239        }
240    }
241}
242
243/// Pure processing logic. No threading concerns.
244pub trait NDPluginProcess: Send + 'static {
245    /// Process one array. Return output arrays and param updates.
246    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
247
248    /// Plugin type name for PLUGIN_TYPE param.
249    fn plugin_type(&self) -> &str;
250
251    /// Whether this plugin can process compressed (`codec != None`) arrays
252    /// (C++ `compressionAware_`, G3). Defaults to `false`: a plugin that
253    /// operates on raw pixels must not be handed compressed bytes — the
254    /// runtime drops compressed input and counts it into DroppedArrays.
255    /// A codec/file plugin that understands compressed data overrides this.
256    fn compression_aware(&self) -> bool {
257        false
258    }
259
260    /// Register plugin-specific params on the base. Called once during construction.
261    fn register_params(
262        &mut self,
263        _base: &mut PortDriverBase,
264    ) -> Result<(), asyn_rs::error::AsynError> {
265        Ok(())
266    }
267
268    /// Called when a param changes. Reason is the param index.
269    /// Return param updates to be written back to the port driver.
270    fn on_param_change(
271        &mut self,
272        _reason: usize,
273        _params: &PluginParamSnapshot,
274    ) -> ParamChangeResult {
275        ParamChangeResult::empty()
276    }
277
278    /// Return a handle to the latest NDArray data for array reads.
279    /// Override this in plugins like NDPluginStdArrays that serve pixel data
280    /// via readInt8Array/readInt16Array/etc.
281    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
282        None
283    }
284}
285
286/// Read-only snapshot of param values available to the processing thread.
287pub struct PluginParamSnapshot {
288    pub enable_callbacks: bool,
289    /// The param reason that changed.
290    pub reason: usize,
291    /// The address (sub-device) that changed.
292    pub addr: i32,
293    /// The new value.
294    pub value: ParamChangeValue,
295}
296
297/// One buffered entry in the sort buffer: the output arrays for a uniqueId
298/// plus the instant they were inserted (for the per-element staleness
299/// deadline — C++ `sortedListElement::insertionTime_`).
300struct SortEntry {
301    arrays: Vec<Arc<NDArray>>,
302    inserted: std::time::Instant,
303}
304
305/// Sort buffer for reordering out-of-order output arrays by uniqueId.
306///
307/// Port of C++ `sortedNDArrayList_` semantics (NDPluginDriver.cpp).
308/// Only arrays that arrive *out of order* are buffered here — in-order
309/// arrays are emitted immediately by the caller (B2). The drain logic
310/// (`drain_ready`) releases the head while the next-expected uniqueId is
311/// contiguous OR the head has been buffered longer than `sort_time` (B3).
312struct SortBuffer {
313    /// Buffered out-of-order arrays keyed by uniqueId.
314    entries: BTreeMap<i32, SortEntry>,
315    /// uniqueId of the last array emitted downstream (C++ `prevUniqueId_`).
316    prev_unique_id: i32,
317    /// Whether any array has been emitted yet (C++ `firstOutputArray_`).
318    first_output: bool,
319    /// Cumulative count of arrays emitted out of order (C++ DisorderedArrays).
320    disordered_arrays: i32,
321    /// Cumulative count of arrays dropped because the buffer was full
322    /// (C++ DroppedOutputArrays — sort-buffer-overflow portion).
323    dropped_output_arrays: i32,
324}
325
326impl SortBuffer {
327    fn new() -> Self {
328        Self {
329            entries: BTreeMap::new(),
330            prev_unique_id: 0,
331            first_output: true,
332            disordered_arrays: 0,
333            dropped_output_arrays: 0,
334        }
335    }
336
337    /// True if `unique_id` follows `prev_unique_id` in order (C++ `orderOK`).
338    fn order_ok(&self, unique_id: i32) -> bool {
339        unique_id == self.prev_unique_id || unique_id == self.prev_unique_id + 1
340    }
341
342    /// Record that an array with `unique_id` was emitted downstream.
343    /// Updates `prev_unique_id` and counts a disorder if it was out of order.
344    fn note_emitted(&mut self, unique_id: i32) {
345        if !self.first_output && !self.order_ok(unique_id) {
346            self.disordered_arrays += 1;
347        }
348        self.first_output = false;
349        self.prev_unique_id = unique_id;
350    }
351
352    /// Insert an out-of-order array into the sort buffer.
353    ///
354    /// Returns `false` if the buffer was full and the array was dropped
355    /// (C++ NDPluginDriver.cpp:307-316), `true` if buffered.
356    fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) -> bool {
357        if sort_size > 0 && self.entries.len() as i32 >= sort_size {
358            self.dropped_output_arrays += 1;
359            return false;
360        }
361        self.entries
362            .entry(unique_id)
363            .or_insert_with(|| SortEntry {
364                arrays: Vec::new(),
365                inserted: std::time::Instant::now(),
366            })
367            .arrays
368            .extend(arrays);
369        true
370    }
371
372    /// Drain the buffer head-first while either the next expected uniqueId is
373    /// contiguous OR the head element has aged past `sort_time` seconds.
374    /// Port of C++ `sortingTask` loop (NDPluginDriver.cpp:619-670).
375    fn drain_ready(&mut self, sort_time: f64) -> Vec<(i32, Vec<Arc<NDArray>>)> {
376        let now = std::time::Instant::now();
377        let mut out = Vec::new();
378        while let Some((&head_id, entry)) = self.entries.iter().next() {
379            let delta = now.duration_since(entry.inserted).as_secs_f64();
380            let order_ok = self.order_ok(head_id);
381            if (!self.first_output && order_ok) || delta > sort_time {
382                let entry = self.entries.remove(&head_id).unwrap();
383                self.note_emitted(head_id);
384                out.push((head_id, entry.arrays));
385            } else {
386                break;
387            }
388        }
389        out
390    }
391
392    /// Drain every buffered array in uniqueId order, regardless of contiguity
393    /// or age. Used when sort mode is turned off.
394    fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
395        let entries = std::mem::take(&mut self.entries);
396        let mut out = Vec::with_capacity(entries.len());
397        for (id, entry) in entries {
398            self.note_emitted(id);
399            out.push((id, entry.arrays));
400        }
401        out
402    }
403
404    /// Number of uniqueId entries currently buffered.
405    fn len(&self) -> i32 {
406        self.entries.len() as i32
407    }
408}
409
410/// Shared processor state protected by a mutex, accessible from both
411/// the data thread (non-blocking mode) and the caller thread (blocking mode).
412struct SharedProcessorInner<P: NDPluginProcess> {
413    processor: P,
414    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
415    pool: Arc<NDArrayPool>,
416    ndarray_params: NDArrayDriverParams,
417    plugin_params: PluginBaseParams,
418    port_handle: PortHandle,
419    /// ArrayCounter — owned in the param library (C++ `NDArrayCounter`), held
420    /// here only as a working copy that is kept in sync with the param so a
421    /// control-plane write of `ARRAY_COUNTER` resets it (B12).
422    array_counter: i32,
423    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
424    std_array_data_param: Option<usize>,
425    /// MinCallbackTime throttling: minimum seconds between process calls.
426    min_callback_time: f64,
427    /// Last time process_and_publish was called (for throttling).
428    last_process_time: Option<std::time::Instant>,
429    /// Sort mode: 0 = disabled, 1 = sorted output.
430    sort_mode: i32,
431    /// Sort time: seconds — per-element staleness deadline for the sort buffer.
432    sort_time: f64,
433    /// Sort size: maximum number of uniqueId entries in the sort buffer.
434    sort_size: i32,
435    /// Sort buffer for reordering output arrays by uniqueId.
436    sort_buffer: SortBuffer,
437    /// Cumulative count of dropped *input* arrays (full queue / compression
438    /// gate / MinCallbackTime throttle). Shared with every upstream sender so
439    /// full-queue drops are visible here (G1, B1, B5).
440    dropped_arrays: Arc<std::sync::atomic::AtomicI32>,
441    /// Whether this plugin can process compressed (`codec != None`) arrays.
442    /// A non-compression-aware plugin drops compressed input (G3).
443    compression_aware: bool,
444    /// Output byte-rate limit (C++ `MaxByteRate`); 0 disables throttling.
445    max_byte_rate: f64,
446    /// Token-bucket throttler enforcing `max_byte_rate` on the output path (G7).
447    throttler: super::throttler::Throttler,
448    /// Last *input* array, cached for ProcessPlugin re-injection
449    /// (C++ `pPrevInputArray_`, G5). Released on `EnableCallbacks=0` (B6).
450    prev_input_array: Option<Arc<NDArray>>,
451    /// Previous array dimensions, for firing an NDDimensions int32-array
452    /// callback when dimensions change (C++ `dimsPrev_`, G8).
453    dims_prev: Vec<i32>,
454    /// Source address selected via the NDArrayAddr PV (C++ `NDArrayAddr`, G6).
455    nd_array_addr: i32,
456    /// MaxThreads — the clamp ceiling for NumThreads (C++ `MaxThreads`).
457    max_threads: i32,
458    /// NumThreads — validated/clamped to [1, MaxThreads] on write (G4).
459    num_threads: i32,
460}
461
462impl<P: NDPluginProcess> SharedProcessorInner<P> {
463    fn should_throttle(&self) -> bool {
464        if self.min_callback_time <= 0.0 {
465            return false;
466        }
467        if let Some(last) = self.last_process_time {
468            last.elapsed().as_secs_f64() < self.min_callback_time
469        } else {
470            false
471        }
472    }
473
474    /// Byte cost of an array for throttling (C++ `NDPluginDriver::throttled`):
475    /// compressed size when a codec is present, else total raw bytes.
476    fn array_byte_cost(array: &NDArray) -> f64 {
477        match &array.codec {
478            Some(c) => c.compressed_size as f64,
479            None => array.info().total_bytes as f64,
480        }
481    }
482
483    /// Apply the output throttle to one array. Returns `true` if the array
484    /// should be emitted, `false` if it was dropped (and counts the drop).
485    fn throttle_ok(&mut self, array: &NDArray) -> bool {
486        if self.max_byte_rate == 0.0 {
487            return true;
488        }
489        let cost = Self::array_byte_cost(array);
490        if self.throttler.try_take(cost) {
491            true
492        } else {
493            self.sort_buffer.dropped_output_arrays += 1;
494            false
495        }
496    }
497
498    /// Route output arrays through the throttle, the in-order fast path, and
499    /// the sort buffer. Returns arrays ready to emit *now*, in order.
500    ///
501    /// Port of C++ `endProcessCallbacks` (NDPluginDriver.cpp:295-328): an
502    /// array whose uniqueId is contiguous with `prevUniqueId_` is emitted
503    /// immediately (B2); only out-of-order arrays enter the sort buffer.
504    /// Disordered arrays are counted at emission time in both modes (B4).
505    fn route_output_arrays(&mut self, arrays: Vec<Arc<NDArray>>) -> Vec<Arc<NDArray>> {
506        let mut ready = Vec::new();
507        for arr in arrays {
508            if !self.throttle_ok(&arr) {
509                continue; // G7: dropped by MaxByteRate throttle
510            }
511            let uid = arr.unique_id;
512            if self.sort_mode != 0
513                && !self.sort_buffer.first_output
514                && !self.sort_buffer.order_ok(uid)
515            {
516                // Out of order with sort mode on: buffer it (B2/B3).
517                self.sort_buffer.insert(uid, vec![arr], self.sort_size);
518            } else {
519                // In order (or sort mode off): emit immediately, count disorder.
520                self.sort_buffer.note_emitted(uid);
521                ready.push(arr);
522            }
523        }
524        // After emitting in-order arrays, the sort buffer head may now be
525        // contiguous — release any newly-ready run (C++ sortingTask).
526        if self.sort_mode != 0 {
527            for (_id, mut bucket) in self.sort_buffer.drain_ready(self.sort_time) {
528                ready.append(&mut bucket);
529            }
530        }
531        ready
532    }
533
534    /// Process array and return a `ProcessOutput`. Does NOT send to actor.
535    /// Direct interrupts (std_array_data_param) happen here (sync).
536    /// The returned output must be published and flushed by the caller in async context.
537    fn process_and_publish(&mut self, array: &Arc<NDArray>) -> Option<ProcessOutput> {
538        // B5: a MinCallbackTime-throttled array is dropped — count it.
539        if self.should_throttle() {
540            self.dropped_arrays
541                .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
542            return Some(self.dropped_arrays_only_batch());
543        }
544        // R2/G5: cache the input array for ProcessPlugin re-injection only
545        // for arrays that actually pass the MinCallbackTime gate and are
546        // processed. C++ sets pPrevInputArray_ in beginProcessCallbacks,
547        // which runs inside processCallbacks — never for throttled frames.
548        self.prev_input_array = Some(Arc::clone(array));
549        let t0 = std::time::Instant::now();
550        let result = self.processor.process_array(array, &self.pool);
551        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
552        self.last_process_time = Some(t0);
553
554        let ready = self.route_output_arrays(result.output_arrays);
555        let mut output = self.build_publish_batch(
556            ready,
557            result.param_updates,
558            result.scatter_index,
559            Some(array.as_ref()),
560            elapsed_ms,
561        );
562        output.batch.merge(self.build_status_params_batch());
563        Some(output)
564    }
565
566    /// A param batch carrying only the current DroppedArrays / queue counters,
567    /// used when an array is dropped before processing (B5).
568    fn dropped_arrays_only_batch(&self) -> ProcessOutput {
569        ProcessOutput {
570            arrays: vec![],
571            scatter_index: None,
572            batch: self.build_status_params_batch(),
573        }
574    }
575
576    /// Re-inject the cached previous input array through the normal process
577    /// path (C++ ProcessPlugin, NDPluginDriver.cpp:739-746, G5).
578    fn process_plugin(&mut self) -> Option<ProcessOutput> {
579        let prev = self.prev_input_array.clone()?;
580        self.process_and_publish(&prev)
581    }
582
583    /// Flush the sort buffer head-first while contiguous or stale (C++
584    /// sortingTask periodic tick). Does NOT drain non-contiguous fresh arrays.
585    fn tick_sort_buffer(&mut self) -> ProcessOutput {
586        let entries = self.sort_buffer.drain_ready(self.sort_time);
587        self.emit_drained(entries)
588    }
589
590    /// Drain the entire sort buffer in uniqueId order (sort mode turned off).
591    fn flush_sort_buffer(&mut self) -> ProcessOutput {
592        let entries = self.sort_buffer.drain_all();
593        self.emit_drained(entries)
594    }
595
596    fn emit_drained(&mut self, entries: Vec<(i32, Vec<Arc<NDArray>>)>) -> ProcessOutput {
597        let mut all_arrays = Vec::new();
598        let mut combined = ParamBatch::empty();
599        for (_unique_id, arrays) in entries {
600            let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
601            all_arrays.extend(output.arrays);
602            combined.merge(output.batch);
603        }
604        combined.merge(self.build_sort_params_batch());
605        ProcessOutput {
606            arrays: all_arrays,
607            scatter_index: None,
608            batch: combined,
609        }
610    }
611
612    fn build_sort_params_batch(&self) -> ParamBatch {
613        use asyn_rs::request::ParamSetValue;
614        let sort_free = self.sort_size - self.sort_buffer.len();
615        ParamBatch {
616            addr0: vec![
617                ParamSetValue::Int32 {
618                    reason: self.plugin_params.sort_free,
619                    addr: 0,
620                    value: sort_free,
621                },
622                ParamSetValue::Int32 {
623                    reason: self.plugin_params.disordered_arrays,
624                    addr: 0,
625                    value: self.sort_buffer.disordered_arrays,
626                },
627                ParamSetValue::Int32 {
628                    reason: self.plugin_params.dropped_output_arrays,
629                    addr: 0,
630                    value: self.sort_buffer.dropped_output_arrays,
631                },
632            ],
633            extra: std::collections::HashMap::new(),
634        }
635    }
636
637    /// Build a param batch carrying the runtime status counters:
638    /// DroppedArrays (G1) plus the sort/disorder counters.
639    fn build_status_params_batch(&self) -> ParamBatch {
640        use asyn_rs::request::ParamSetValue;
641        let mut batch = self.build_sort_params_batch();
642        batch.addr0.push(ParamSetValue::Int32 {
643            reason: self.plugin_params.dropped_arrays,
644            addr: 0,
645            value: self
646                .dropped_arrays
647                .load(std::sync::atomic::Ordering::Acquire),
648        });
649        batch
650    }
651
652    /// Build a ProcessOutput: fires direct interrupts (sync) and collects
653    /// param updates into a batch. Does NOT publish arrays — the caller
654    /// must publish them in async context.
655    fn build_publish_batch(
656        &mut self,
657        output_arrays: Vec<Arc<NDArray>>,
658        param_updates: Vec<ParamUpdate>,
659        scatter_index: Option<usize>,
660        fallback_array: Option<&NDArray>,
661        elapsed_ms: f64,
662    ) -> ProcessOutput {
663        use asyn_rs::request::ParamSetValue;
664
665        let mut addr0: Vec<ParamSetValue> = Vec::new();
666        let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
667            std::collections::HashMap::new();
668
669        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
670            self.array_counter += 1;
671
672            // Fire array data interrupt directly (C EPICS pattern).
673            if let Some(param) = self.std_array_data_param {
674                use crate::ndarray::NDDataBuffer;
675                use asyn_rs::param::ParamValue;
676                let value = match &report_arr.data {
677                    NDDataBuffer::I8(v) => {
678                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
679                    }
680                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
681                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
682                    ))),
683                    NDDataBuffer::I16(v) => {
684                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
685                    }
686                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
687                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
688                    ))),
689                    NDDataBuffer::I32(v) => {
690                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
691                    }
692                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
693                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
694                    ))),
695                    NDDataBuffer::I64(v) => {
696                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
697                    }
698                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
699                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
700                    ))),
701                    NDDataBuffer::F32(v) => {
702                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
703                    }
704                    NDDataBuffer::F64(v) => {
705                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
706                    }
707                };
708                if let Some(value) = value {
709                    let ts = report_arr.timestamp.to_system_time();
710                    self.port_handle
711                        .interrupts()
712                        .notify(asyn_rs::interrupt::InterruptValue {
713                            reason: param,
714                            addr: 0,
715                            value,
716                            timestamp: ts,
717                            uint32_changed_mask: 0,
718                            ..Default::default()
719                        });
720                }
721            }
722
723            let info = report_arr.info();
724            // B11: read ColorMode / BayerPattern from the NDArray attributes
725            // (C++ beginProcessCallbacks). `info()` already resolves the
726            // ColorMode attribute when present; fall back to it for the param.
727            let color_mode = report_arr
728                .attributes
729                .get("ColorMode")
730                .and_then(|a| a.value.as_i64())
731                .map(|v| v as i32)
732                .unwrap_or(info.color_mode as i32);
733            let bayer_pattern = report_arr
734                .attributes
735                .get("BayerPattern")
736                .and_then(|a| a.value.as_i64())
737                .map(|v| v as i32)
738                .unwrap_or(0);
739
740            // G8: fire an int32-array callback on NDDimensions when the array
741            // dimensions change (C++ beginProcessCallbacks dimsPrev_).
742            let cur_dims: Vec<i32> = report_arr.dims.iter().map(|d| d.size as i32).collect();
743            if cur_dims != self.dims_prev {
744                self.dims_prev = cur_dims.clone();
745                self.port_handle
746                    .interrupts()
747                    .notify(asyn_rs::interrupt::InterruptValue {
748                        reason: self.ndarray_params.array_dimensions,
749                        addr: 0,
750                        value: asyn_rs::param::ParamValue::Int32Array(std::sync::Arc::from(
751                            cur_dims.as_slice(),
752                        )),
753                        timestamp: report_arr.timestamp.to_system_time(),
754                        uint32_changed_mask: 0,
755                        ..Default::default()
756                    });
757            }
758
759            addr0.extend([
760                ParamSetValue::Int32 {
761                    reason: self.ndarray_params.array_counter,
762                    addr: 0,
763                    value: self.array_counter,
764                },
765                ParamSetValue::Int32 {
766                    reason: self.ndarray_params.unique_id,
767                    addr: 0,
768                    value: report_arr.unique_id,
769                },
770                ParamSetValue::Int32 {
771                    reason: self.ndarray_params.n_dimensions,
772                    addr: 0,
773                    value: report_arr.dims.len() as i32,
774                },
775                ParamSetValue::Int32 {
776                    reason: self.ndarray_params.array_size_x,
777                    addr: 0,
778                    value: info.x_size as i32,
779                },
780                ParamSetValue::Int32 {
781                    reason: self.ndarray_params.array_size_y,
782                    addr: 0,
783                    value: info.y_size as i32,
784                },
785                ParamSetValue::Int32 {
786                    reason: self.ndarray_params.array_size_z,
787                    addr: 0,
788                    value: info.color_size as i32,
789                },
790                ParamSetValue::Int32 {
791                    reason: self.ndarray_params.array_size,
792                    addr: 0,
793                    value: info.total_bytes as i32,
794                },
795                ParamSetValue::Int32 {
796                    reason: self.ndarray_params.data_type,
797                    addr: 0,
798                    value: report_arr.data.data_type() as i32,
799                },
800                ParamSetValue::Int32 {
801                    reason: self.ndarray_params.color_mode,
802                    addr: 0,
803                    value: color_mode,
804                },
805                ParamSetValue::Int32 {
806                    reason: self.ndarray_params.bayer_pattern,
807                    addr: 0,
808                    value: bayer_pattern,
809                },
810                ParamSetValue::Float64 {
811                    reason: self.ndarray_params.timestamp_rbv,
812                    addr: 0,
813                    value: report_arr.timestamp.as_f64(),
814                },
815                ParamSetValue::Int32 {
816                    reason: self.ndarray_params.epics_ts_sec,
817                    addr: 0,
818                    value: report_arr.timestamp.sec as i32,
819                },
820                ParamSetValue::Int32 {
821                    reason: self.ndarray_params.epics_ts_nsec,
822                    addr: 0,
823                    value: report_arr.timestamp.nsec as i32,
824                },
825            ]);
826        }
827
828        addr0.push(ParamSetValue::Float64 {
829            reason: self.plugin_params.execution_time,
830            addr: 0,
831            value: elapsed_ms,
832        });
833
834        // ArrayRate_RBV is computed by a calc record in the DB template
835        // (SCAN "1 second", reading ArrayCounter_RBV delta), not in Rust.
836
837        // Plugin-specific param updates.
838        for update in &param_updates {
839            match update {
840                ParamUpdate::Int32 {
841                    reason,
842                    addr,
843                    value,
844                } => {
845                    let pv = ParamSetValue::Int32 {
846                        reason: *reason,
847                        addr: *addr,
848                        value: *value,
849                    };
850                    if *addr == 0 {
851                        addr0.push(pv);
852                    } else {
853                        extra.entry(*addr).or_default().push(pv);
854                    }
855                }
856                ParamUpdate::Float64 {
857                    reason,
858                    addr,
859                    value,
860                } => {
861                    let pv = ParamSetValue::Float64 {
862                        reason: *reason,
863                        addr: *addr,
864                        value: *value,
865                    };
866                    if *addr == 0 {
867                        addr0.push(pv);
868                    } else {
869                        extra.entry(*addr).or_default().push(pv);
870                    }
871                }
872                ParamUpdate::Octet {
873                    reason,
874                    addr,
875                    value,
876                } => {
877                    let pv = ParamSetValue::Octet {
878                        reason: *reason,
879                        addr: *addr,
880                        value: value.clone(),
881                    };
882                    if *addr == 0 {
883                        addr0.push(pv);
884                    } else {
885                        extra.entry(*addr).or_default().push(pv);
886                    }
887                }
888                ParamUpdate::Float64Array {
889                    reason,
890                    addr,
891                    value,
892                } => {
893                    let pv = ParamSetValue::Float64Array {
894                        reason: *reason,
895                        addr: *addr,
896                        value: value.clone(),
897                    };
898                    if *addr == 0 {
899                        addr0.push(pv);
900                    } else {
901                        extra.entry(*addr).or_default().push(pv);
902                    }
903                }
904            }
905        }
906
907        ProcessOutput {
908            arrays: output_arrays,
909            scatter_index,
910            batch: ParamBatch { addr0, extra },
911        }
912    }
913}
914
915/// Output from processing: arrays to publish + param batch to flush.
916struct ProcessOutput {
917    arrays: Vec<Arc<NDArray>>,
918    scatter_index: Option<usize>,
919    batch: ParamBatch,
920}
921
922impl ProcessOutput {
923    /// Publish arrays to downstream senders (async, concurrent fan-out).
924    ///
925    /// Each array is published to all senders concurrently (independent
926    /// backpressure per sender). Arrays are published in order — the next
927    /// array's fan-out starts only after the previous one completes.
928    async fn publish_arrays(&self, senders: &[NDArraySender]) {
929        for arr in &self.arrays {
930            if let Some(idx) = self.scatter_index {
931                if let Some(sender) = senders.get(idx % senders.len().max(1)) {
932                    sender.publish(arr.clone()).await;
933                }
934            } else {
935                let futs = senders.iter().map(|s| s.publish(arr.clone()));
936                futures_util::future::join_all(futs).await;
937            }
938        }
939    }
940}
941
942/// Collected param updates ready to be flushed to the actor.
943/// Produced by `build_publish_batch()`, consumed by async `flush()`.
944struct ParamBatch {
945    addr0: Vec<asyn_rs::request::ParamSetValue>,
946    extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
947}
948
949impl ParamBatch {
950    fn empty() -> Self {
951        Self {
952            addr0: Vec::new(),
953            extra: std::collections::HashMap::new(),
954        }
955    }
956
957    fn merge(&mut self, other: ParamBatch) {
958        self.addr0.extend(other.addr0);
959        for (addr, updates) in other.extra {
960            self.extra.entry(addr).or_default().extend(updates);
961        }
962    }
963
964    /// Flush via reliable async enqueue. Call from async context.
965    async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
966        if !self.addr0.is_empty() {
967            if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
968                eprintln!("plugin param flush error (addr 0): {e}");
969            }
970        }
971        for (addr, updates) in self.extra {
972            if let Err(e) = port.set_params_and_notify(addr, updates).await {
973                eprintln!("plugin param flush error (addr {addr}): {e}");
974            }
975        }
976    }
977}
978
979/// PortDriver implementation for a plugin's control plane.
980#[allow(dead_code)]
981pub struct PluginPortDriver {
982    base: PortDriverBase,
983    ndarray_params: NDArrayDriverParams,
984    plugin_params: PluginBaseParams,
985    param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
986    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
987    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
988    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
989    std_array_data_param: Option<usize>,
990}
991
992impl PluginPortDriver {
993    fn new<P: NDPluginProcess>(
994        port_name: &str,
995        plugin_type_name: &str,
996        queue_size: usize,
997        ndarray_port: &str,
998        max_addr: usize,
999        param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
1000        processor: &mut P,
1001        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
1002    ) -> AsynResult<Self> {
1003        let mut base = PortDriverBase::new(
1004            port_name,
1005            max_addr,
1006            PortFlags {
1007                can_block: true,
1008                ..Default::default()
1009            },
1010        );
1011
1012        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
1013        let plugin_params = PluginBaseParams::create(&mut base)?;
1014
1015        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
1016        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
1017        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
1018        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
1019        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
1020        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
1021        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
1022        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
1023        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
1024        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
1025        base.set_int32_param(ndarray_params.capture, 0, 0)?;
1026        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
1027        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
1028        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
1029        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
1030        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
1031        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
1032        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
1033        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
1034        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
1035        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
1036
1037        // Set plugin identity params
1038        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
1039        base.set_string_param(
1040            ndarray_params.ad_core_version,
1041            0,
1042            env!("CARGO_PKG_VERSION").into(),
1043        )?;
1044        base.set_string_param(
1045            ndarray_params.driver_version,
1046            0,
1047            env!("CARGO_PKG_VERSION").into(),
1048        )?;
1049        if !ndarray_port.is_empty() {
1050            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
1051        }
1052
1053        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
1054        let std_array_data_param = if array_data.is_some() {
1055            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
1056        } else {
1057            None
1058        };
1059
1060        // Let the processor register its plugin-specific params
1061        processor.register_params(&mut base)?;
1062
1063        Ok(Self {
1064            base,
1065            ndarray_params,
1066            plugin_params,
1067            param_change_tx,
1068            array_data,
1069            std_array_data_param,
1070        })
1071    }
1072}
1073
1074/// Copy source slice directly into destination buffer, returning elements copied.
1075fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
1076    let n = src.len().min(dst.len());
1077    dst[..n].copy_from_slice(&src[..n]);
1078    n
1079}
1080
1081/// Convert and copy source slice into destination buffer element-by-element.
1082fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
1083where
1084    S: CastToF64 + Copy,
1085    D: CastFromF64 + Copy,
1086{
1087    let n = src.len().min(dst.len());
1088    for i in 0..n {
1089        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
1090    }
1091    n
1092}
1093
1094/// Cast an integer source element to an integer destination element with C
1095/// cast semantics. C++ `NDArrayPool::convert` (`NDArrayPool.cpp:388`,
1096/// `convertType`: `*pDataOut++ = (dataTypeOut)(*pDataIn++)`; and `:466`,
1097/// `convertDim`) performs a plain C cast between integer types. A C cast:
1098///   - same-width sign change is a bitwise reinterpret
1099///     (`(epicsInt8)(epicsUInt8)255 == -1`);
1100///   - narrowing truncates to the low bits, wrapping
1101///     (`(epicsInt8)(epicsUInt16)300 == 44`);
1102///   - widening sign/zero-extends exactly.
1103///
1104/// Rust's `as` between integer types implements exactly these semantics. The
1105/// f64 round-trip in [`copy_convert`] does NOT: it saturates on narrowing
1106/// (`300.0 as i8 == 127`), diverging from C++. So every integer-source ->
1107/// integer-target NDArray array read must go through this C-cast path, not
1108/// `copy_convert`.
1109trait CCastTo<D> {
1110    fn ccast(self) -> D;
1111}
1112macro_rules! impl_ccast {
1113    ( $src:ty => $( $dst:ty ),+ ) => {
1114        $(
1115            impl CCastTo<$dst> for $src {
1116                #[inline]
1117                fn ccast(self) -> $dst {
1118                    self as $dst
1119                }
1120            }
1121        )+
1122    };
1123}
1124impl_ccast!(i8 => i16, i32, i64);
1125impl_ccast!(u8 => i8, i16, i32, i64);
1126impl_ccast!(i16 => i8, i32, i64);
1127impl_ccast!(u16 => i8, i16, i32, i64);
1128impl_ccast!(i32 => i8, i16, i64);
1129impl_ccast!(u32 => i8, i16, i32, i64);
1130impl_ccast!(i64 => i8, i16, i32);
1131impl_ccast!(u64 => i8, i16, i32, i64);
1132
1133/// Copy an integer source slice into an integer destination buffer using C
1134/// cast semantics (see [`CCastTo`]) — truncating on narrowing, never
1135/// saturating.
1136fn copy_ccast<S, D>(src: &[S], dst: &mut [D]) -> usize
1137where
1138    S: CCastTo<D> + Copy,
1139    D: Copy,
1140{
1141    let n = src.len().min(dst.len());
1142    for i in 0..n {
1143        dst[i] = src[i].ccast();
1144    }
1145    n
1146}
1147
1148/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
1149trait CastToF64 {
1150    fn cast_to_f64(self) -> f64;
1151}
1152
1153impl CastToF64 for i8 {
1154    fn cast_to_f64(self) -> f64 {
1155        self as f64
1156    }
1157}
1158impl CastToF64 for u8 {
1159    fn cast_to_f64(self) -> f64 {
1160        self as f64
1161    }
1162}
1163impl CastToF64 for i16 {
1164    fn cast_to_f64(self) -> f64 {
1165        self as f64
1166    }
1167}
1168impl CastToF64 for u16 {
1169    fn cast_to_f64(self) -> f64 {
1170        self as f64
1171    }
1172}
1173impl CastToF64 for i32 {
1174    fn cast_to_f64(self) -> f64 {
1175        self as f64
1176    }
1177}
1178impl CastToF64 for u32 {
1179    fn cast_to_f64(self) -> f64 {
1180        self as f64
1181    }
1182}
1183impl CastToF64 for i64 {
1184    fn cast_to_f64(self) -> f64 {
1185        self as f64
1186    }
1187}
1188impl CastToF64 for u64 {
1189    fn cast_to_f64(self) -> f64 {
1190        self as f64
1191    }
1192}
1193impl CastToF64 for f32 {
1194    fn cast_to_f64(self) -> f64 {
1195        self as f64
1196    }
1197}
1198impl CastToF64 for f64 {
1199    fn cast_to_f64(self) -> f64 {
1200        self
1201    }
1202}
1203
1204/// Helper trait for `as` casts from f64.
1205trait CastFromF64 {
1206    fn cast_from_f64(v: f64) -> Self;
1207}
1208
1209impl CastFromF64 for i8 {
1210    fn cast_from_f64(v: f64) -> Self {
1211        v as i8
1212    }
1213}
1214impl CastFromF64 for i16 {
1215    fn cast_from_f64(v: f64) -> Self {
1216        v as i16
1217    }
1218}
1219impl CastFromF64 for i32 {
1220    fn cast_from_f64(v: f64) -> Self {
1221        v as i32
1222    }
1223}
1224impl CastFromF64 for i64 {
1225    fn cast_from_f64(v: f64) -> Self {
1226        v as i64
1227    }
1228}
1229impl CastFromF64 for f32 {
1230    fn cast_from_f64(v: f64) -> Self {
1231        v as f32
1232    }
1233}
1234impl CastFromF64 for f64 {
1235    fn cast_from_f64(v: f64) -> Self {
1236        v
1237    }
1238}
1239
1240/// Copy NDArray data into the output buffer with type conversion.
1241/// Returns the number of elements copied, or 0 if no data is available.
1242macro_rules! impl_read_array {
1243    (
1244        $self:expr, $buf:expr, $direct_variant:ident,
1245        ccast: [ $( $ccast_variant:ident ),* ],
1246        convert: [ $( $variant:ident ),* ]
1247    ) => {{
1248        use crate::ndarray::NDDataBuffer;
1249        let handle = match &$self.array_data {
1250            Some(h) => h,
1251            None => return Ok(0),
1252        };
1253        let guard = handle.lock();
1254        let array = match &*guard {
1255            Some(a) => a,
1256            None => return Ok(0),
1257        };
1258        let n = match &array.data {
1259            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
1260            $( NDDataBuffer::$ccast_variant(v) => copy_ccast(v, $buf), )*
1261            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
1262        };
1263        Ok(n)
1264    }};
1265}
1266
1267impl PortDriver for PluginPortDriver {
1268    fn base(&self) -> &PortDriverBase {
1269        &self.base
1270    }
1271
1272    fn base_mut(&mut self) -> &mut PortDriverBase {
1273        &mut self.base
1274    }
1275
1276    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
1277        let reason = user.reason;
1278        let addr = user.addr;
1279        self.base.set_int32_param(reason, addr, value)?;
1280        self.base.call_param_callbacks(addr)?;
1281        // B14: reliable send on an unbounded channel — never drop param changes.
1282        let _ = self
1283            .param_change_tx
1284            .send((reason, addr, ParamChangeValue::Int32(value)));
1285        Ok(())
1286    }
1287
1288    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
1289        let reason = user.reason;
1290        let addr = user.addr;
1291        self.base.set_float64_param(reason, addr, value)?;
1292        self.base.call_param_callbacks(addr)?;
1293        let _ = self
1294            .param_change_tx
1295            .send((reason, addr, ParamChangeValue::Float64(value)));
1296        Ok(())
1297    }
1298
1299    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
1300        let reason = user.reason;
1301        let addr = user.addr;
1302        let s = String::from_utf8_lossy(data).into_owned();
1303        self.base.set_string_param(reason, addr, s.clone())?;
1304        self.base.call_param_callbacks(addr)?;
1305        let _ = self
1306            .param_change_tx
1307            .send((reason, addr, ParamChangeValue::Octet(s)));
1308        Ok(())
1309    }
1310
1311    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
1312        // Every integer source -> i8 is a C cast (truncating, per C++
1313        // NDArrayPool.cpp:388); float sources keep the numeric f64 conversion.
1314        impl_read_array!(
1315            self, buf, I8,
1316            ccast: [U8, I16, U16, I32, U32, I64, U64],
1317            convert: [F32, F64]
1318        )
1319    }
1320
1321    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
1322        impl_read_array!(
1323            self, buf, I16,
1324            ccast: [I8, U8, U16, I32, U32, I64, U64],
1325            convert: [F32, F64]
1326        )
1327    }
1328
1329    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
1330        impl_read_array!(
1331            self, buf, I32,
1332            ccast: [I8, U8, I16, U16, U32, I64, U64],
1333            convert: [F32, F64]
1334        )
1335    }
1336
1337    fn read_int64_array(&mut self, _user: &AsynUser, buf: &mut [i64]) -> AsynResult<usize> {
1338        impl_read_array!(
1339            self, buf, I64,
1340            ccast: [I8, U8, I16, U16, I32, U32, U64],
1341            convert: [F32, F64]
1342        )
1343    }
1344
1345    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
1346        impl_read_array!(
1347            self, buf, F32,
1348            ccast: [],
1349            convert: [I8, U8, I16, U16, I32, U32, I64, U64, F64]
1350        )
1351    }
1352
1353    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
1354        impl_read_array!(
1355            self, buf, F64,
1356            ccast: [],
1357            convert: [I8, U8, I16, U16, I32, U32, I64, U64, F32]
1358        )
1359    }
1360}
1361
1362/// Handle to a running plugin runtime. Provides access to sender and port handle.
1363#[derive(Clone)]
1364pub struct PluginRuntimeHandle {
1365    port_runtime: PortRuntimeHandle,
1366    array_sender: NDArraySender,
1367    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
1368    port_name: String,
1369    pub ndarray_params: NDArrayDriverParams,
1370    pub plugin_params: PluginBaseParams,
1371}
1372
1373impl PluginRuntimeHandle {
1374    pub fn port_runtime(&self) -> &PortRuntimeHandle {
1375        &self.port_runtime
1376    }
1377
1378    pub fn array_sender(&self) -> &NDArraySender {
1379        &self.array_sender
1380    }
1381
1382    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
1383        &self.array_output
1384    }
1385
1386    pub fn port_name(&self) -> &str {
1387        &self.port_name
1388    }
1389}
1390
1391/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
1392///
1393/// Returns:
1394/// - `PluginRuntimeHandle` for wiring and control
1395/// - `PortRuntimeHandle` for param I/O
1396/// - `JoinHandle` for the data processing thread
1397pub fn create_plugin_runtime<P: NDPluginProcess>(
1398    port_name: &str,
1399    processor: P,
1400    pool: Arc<NDArrayPool>,
1401    queue_size: usize,
1402    ndarray_port: &str,
1403    wiring: Arc<WiringRegistry>,
1404) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1405    create_plugin_runtime_multi_addr(
1406        port_name,
1407        processor,
1408        pool,
1409        queue_size,
1410        ndarray_port,
1411        wiring,
1412        1,
1413    )
1414}
1415
1416/// Create a plugin runtime with multi-addr support.
1417///
1418/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
1419pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1420    port_name: &str,
1421    mut processor: P,
1422    pool: Arc<NDArrayPool>,
1423    queue_size: usize,
1424    ndarray_port: &str,
1425    wiring: Arc<WiringRegistry>,
1426    max_addr: usize,
1427) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1428    // Param change channel (control plane -> data plane)
1429    // B14: unbounded so control-plane param changes (e.g. autosave restoring
1430    // hundreds of PVs at IOC init) are never silently dropped before the
1431    // data plane sees them.
1432    let (param_tx, param_rx) =
1433        tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
1434
1435    // Capture plugin type and array data handle before mutable borrow
1436    let plugin_type_name = processor.plugin_type().to_string();
1437    let compression_aware = processor.compression_aware();
1438    let array_data = processor.array_data_handle();
1439
1440    // Create the port driver for control plane
1441    let driver = PluginPortDriver::new(
1442        port_name,
1443        &plugin_type_name,
1444        queue_size,
1445        ndarray_port,
1446        max_addr,
1447        param_tx,
1448        &mut processor,
1449        array_data,
1450    )
1451    .expect("failed to create plugin port driver");
1452
1453    let ndarray_params = driver.ndarray_params;
1454    let plugin_params = driver.plugin_params;
1455    let std_array_data_param = driver.std_array_data_param;
1456
1457    // Create port runtime (actor thread for param I/O)
1458    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1459
1460    // Clone port handle for the data thread to write params back
1461    let port_handle = port_runtime.port_handle().clone();
1462
1463    // Array channel (data plane)
1464    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1465
1466    // Shared mode flags
1467    let enabled = Arc::new(AtomicBool::new(false));
1468    let blocking_mode = Arc::new(AtomicBool::new(false));
1469
1470    // Shared processor (accessible from data thread)
1471    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1472    let array_output_for_handle = array_output.clone();
1473    // B13/G6: register this plugin's output so the WiringRegistry is the
1474    // single source of truth for runtime rewiring (PluginManager::add_plugin
1475    // would also register it, but direct callers must not bypass the
1476    // registry). Registered under every address in 0..max_addr so a
1477    // downstream plugin can select a non-zero NDArrayAddr.
1478    wiring.register_output_addrs(port_name, max_addr, array_output.clone());
1479    // G1/B1: the DroppedArrays counter is owned by this plugin and shared with
1480    // every upstream sender so full-queue drops on our input queue are counted.
1481    let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
1482    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1483        processor,
1484        output: array_output,
1485        pool,
1486        ndarray_params,
1487        plugin_params,
1488        port_handle,
1489        array_counter: 0,
1490        std_array_data_param,
1491        min_callback_time: 0.0,
1492        last_process_time: None,
1493        sort_mode: 0,
1494        sort_time: 0.0,
1495        sort_size: 10,
1496        sort_buffer: SortBuffer::new(),
1497        dropped_arrays: dropped_arrays_counter,
1498        compression_aware,
1499        max_byte_rate: 0.0,
1500        throttler: super::throttler::Throttler::new(0.0),
1501        prev_input_array: None,
1502        dims_prev: Vec::new(),
1503        nd_array_addr: 0,
1504        max_threads: 1,
1505        num_threads: 1,
1506    }));
1507
1508    let data_enabled = enabled.clone();
1509    let data_blocking = blocking_mode.clone();
1510
1511    let mut array_sender = array_sender;
1512    array_sender.set_mode_flags(enabled, blocking_mode);
1513
1514    // Capture wiring info for data loop
1515    let sender_port_name = port_name.to_string();
1516    let initial_upstream = ndarray_port.to_string();
1517
1518    // Spawn data processing thread
1519    let data_jh = thread::Builder::new()
1520        .name(format!("plugin-data-{port_name}"))
1521        .spawn(move || {
1522            plugin_data_loop(
1523                shared,
1524                array_rx,
1525                param_rx,
1526                plugin_params,
1527                ndarray_params.array_counter,
1528                data_enabled,
1529                data_blocking,
1530                sender_port_name,
1531                initial_upstream,
1532                wiring,
1533            );
1534        })
1535        .expect("failed to spawn plugin data thread");
1536
1537    let handle = PluginRuntimeHandle {
1538        port_runtime,
1539        array_sender,
1540        array_output: array_output_for_handle,
1541        port_name: port_name.to_string(),
1542        ndarray_params,
1543        plugin_params,
1544    };
1545
1546    (handle, data_jh)
1547}
1548
1549/// Build a param batch reporting the input-queue depth.
1550///
1551/// `QUEUE_SIZE` = total capacity, `QUEUE_FREE` = free slots. G2: the param is
1552/// named `QUEUE_FREE` and the reconciled semantics are *free slots*, matching
1553/// C++ `NDPluginDriverQueueFree = queueSize - pending()`.
1554fn queue_status_batch(
1555    plugin_params: &PluginBaseParams,
1556    max_capacity: usize,
1557    free: i32,
1558) -> ParamBatch {
1559    use asyn_rs::request::ParamSetValue;
1560    ParamBatch {
1561        addr0: vec![
1562            ParamSetValue::Int32 {
1563                reason: plugin_params.queue_size,
1564                addr: 0,
1565                value: max_capacity as i32,
1566            },
1567            ParamSetValue::Int32 {
1568                reason: plugin_params.queue_use,
1569                addr: 0,
1570                value: free,
1571            },
1572        ],
1573        extra: std::collections::HashMap::new(),
1574    }
1575}
1576
1577/// Write a validated/clamped int32 value back into the param library so the
1578/// RBV reflects the accepted value (G4 NumThreads/MaxThreads clamping).
1579async fn clamp_writeback(port: &PortHandle, reason: usize, value: i32) {
1580    use asyn_rs::request::ParamSetValue;
1581    let _ = port
1582        .set_params_and_notify(
1583            0,
1584            vec![ParamSetValue::Int32 {
1585                reason,
1586                addr: 0,
1587                value,
1588            }],
1589        )
1590        .await;
1591}
1592
1593fn plugin_data_loop<P: NDPluginProcess>(
1594    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1595    mut array_rx: NDArrayReceiver,
1596    mut param_rx: tokio::sync::mpsc::UnboundedReceiver<(usize, i32, ParamChangeValue)>,
1597    plugin_params: PluginBaseParams,
1598    array_counter_reason: usize,
1599    enabled: Arc<AtomicBool>,
1600    blocking_mode: Arc<AtomicBool>,
1601    sender_port_name: String,
1602    initial_upstream: String,
1603    wiring: Arc<WiringRegistry>,
1604) {
1605    let enable_callbacks_reason = plugin_params.enable_callbacks;
1606    let blocking_callbacks_reason = plugin_params.blocking_callbacks;
1607    let min_callback_time_reason = plugin_params.min_callback_time;
1608    let sort_mode_reason = plugin_params.sort_mode;
1609    let sort_time_reason = plugin_params.sort_time;
1610    let sort_size_reason = plugin_params.sort_size;
1611    let nd_array_port_reason = plugin_params.nd_array_port;
1612    let nd_array_addr_reason = plugin_params.nd_array_addr;
1613    let process_plugin_reason = plugin_params.process_plugin;
1614    let max_byte_rate_reason = plugin_params.max_byte_rate;
1615    let num_threads_reason = plugin_params.num_threads;
1616    let max_threads_reason = plugin_params.max_threads;
1617    // G6: the upstream connection is keyed by (port, addr). `current_upstream`
1618    // is the base port name; `current_addr` is the selected NDArrayAddr; the
1619    // effective WiringRegistry key is computed by `upstream_key`.
1620    let mut current_upstream = initial_upstream;
1621    let mut current_addr: i32 = 0;
1622    let rt = tokio::runtime::Builder::new_current_thread()
1623        .enable_all()
1624        .build()
1625        .unwrap();
1626    rt.block_on(async {
1627        // Sort flush timer — starts disabled (very long interval).
1628        // Re-created when sort_time changes.
1629        let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1630        let mut sort_flush_active = false;
1631        // Last published QueueFree value — only flush the queue params when it
1632        // changes, so a steady queue depth does not spam param callbacks.
1633        let mut last_queue_free: Option<i32> = None;
1634
1635        loop {
1636            tokio::select! {
1637                msg = array_rx.recv_msg() => {
1638                    match msg {
1639                        Some(msg) => {
1640                            // B6: quiesce is synchronous — if callbacks were
1641                            // disabled (the param arm flips `enabled` before
1642                            // any further array message is handled), drop the
1643                            // array here without processing.
1644                            if !enabled.load(Ordering::Acquire) {
1645                                continue;
1646                            }
1647                            // Process array and collect output (sync, under lock).
1648                            let (process_output, senders, port) = {
1649                                let mut guard = shared.lock();
1650                                // G3: a non-compression-aware plugin must drop
1651                                // a compressed array (codec set) and count it
1652                                // (C++ driverCallback NDPluginDriver.cpp:383-394).
1653                                let compressed = msg.array.codec.is_some();
1654                                let output = if compressed && !guard.compression_aware {
1655                                    guard
1656                                        .dropped_arrays
1657                                        .fetch_add(1, Ordering::AcqRel);
1658                                    Some(guard.dropped_arrays_only_batch())
1659                                } else {
1660                                    // R2/G5: process_and_publish caches the
1661                                    // input array for ProcessPlugin only after
1662                                    // the MinCallbackTime gate passes — a
1663                                    // throttled frame is never cached.
1664                                    guard.process_and_publish(&msg.array)
1665                                };
1666                                let senders = guard.output.lock().senders_clone();
1667                                let port = guard.port_handle.clone();
1668                                (output, senders, port)
1669                            };
1670                            // G2: update QueueSize/QueueFree from the channel
1671                            // depth (C++ NDPluginDriver.cpp:512-513). QueueFree
1672                            // = max_capacity - pending. Only flush when the
1673                            // value changed to avoid no-op param callbacks.
1674                            let max_cap = array_rx.max_capacity();
1675                            let free = max_cap.saturating_sub(array_rx.pending()) as i32;
1676                            let queue_batch = if last_queue_free != Some(free) {
1677                                last_queue_free = Some(free);
1678                                Some(queue_status_batch(&plugin_params, max_cap, free))
1679                            } else {
1680                                None
1681                            };
1682                            // msg dropped here → completion signaled (if tracked)
1683                            // Publish arrays and flush params outside the lock, in async context.
1684                            if let Some(po) = process_output {
1685                                po.publish_arrays(&senders).await;
1686                                po.batch.flush(&port).await;
1687                            }
1688                            if let Some(qb) = queue_batch {
1689                                qb.flush(&port).await;
1690                            }
1691                        }
1692                        None => break,
1693                    }
1694                }
1695                param = param_rx.recv() => {
1696                    match param {
1697                        Some((reason, addr, value)) => {
1698                            if reason == enable_callbacks_reason {
1699                                let on = value.as_i32() != 0;
1700                                enabled.store(on, Ordering::Release);
1701                                // B6: disabling releases the cached input array
1702                                // (C++ writeInt32 NDPluginDriver.cpp:712-722).
1703                                if !on {
1704                                    shared.lock().prev_input_array = None;
1705                                }
1706                            }
1707                            if reason == blocking_callbacks_reason {
1708                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1709                            }
1710                            // Handle MinCallbackTime param change
1711                            if reason == min_callback_time_reason {
1712                                shared.lock().min_callback_time = value.as_f64();
1713                            }
1714                            // G7: MaxByteRate change resets the output throttler
1715                            // (C++ writeFloat64 NDPluginDriver.cpp:788-790).
1716                            if reason == max_byte_rate_reason {
1717                                let rate = value.as_f64();
1718                                let mut guard = shared.lock();
1719                                guard.max_byte_rate = rate;
1720                                guard.throttler.reset(rate);
1721                            }
1722                            // G4: NumThreads / MaxThreads are validated and
1723                            // clamped on write. The Rust port is intentionally
1724                            // single-threaded per plugin (one tokio task) — see
1725                            // the module note — so NumThreads is clamped to
1726                            // [1, MaxThreads] and the clamped value written back
1727                            // rather than spawning a worker pool.
1728                            if reason == max_threads_reason {
1729                                // Scope the guard so it is released before await.
1730                                let (port, clamped, mt) = {
1731                                    let mut guard = shared.lock();
1732                                    guard.max_threads = value.as_i32().max(1);
1733                                    let clamped =
1734                                        guard.num_threads.clamp(1, guard.max_threads);
1735                                    guard.num_threads = clamped;
1736                                    (guard.port_handle.clone(), clamped, guard.max_threads)
1737                                };
1738                                clamp_writeback(&port, num_threads_reason, clamped).await;
1739                                clamp_writeback(&port, max_threads_reason, mt).await;
1740                            }
1741                            if reason == num_threads_reason {
1742                                let (port, clamped) = {
1743                                    let mut guard = shared.lock();
1744                                    let clamped =
1745                                        value.as_i32().clamp(1, guard.max_threads.max(1));
1746                                    guard.num_threads = clamped;
1747                                    (guard.port_handle.clone(), clamped)
1748                                };
1749                                clamp_writeback(&port, num_threads_reason, clamped).await;
1750                            }
1751                            // G6: NDArrayAddr selects a source address of a
1752                            // multi-address driver — reconnect on change
1753                            // (C++ writeInt32 NDPluginDriver.cpp:724-728).
1754                            if reason == nd_array_addr_reason {
1755                                let new_addr = value.as_i32();
1756                                if new_addr != current_addr {
1757                                    let old_key = upstream_key(&current_upstream, current_addr);
1758                                    let new_key = upstream_key(&current_upstream, new_addr);
1759                                    shared.lock().nd_array_addr = new_addr;
1760                                    match wiring.rewire_by_name(
1761                                        &sender_port_name,
1762                                        &old_key,
1763                                        &new_key,
1764                                    ) {
1765                                        Ok(()) => current_addr = new_addr,
1766                                        Err(e) => {
1767                                            eprintln!("NDArrayAddr reconnect failed: {e}");
1768                                            shared.lock().nd_array_addr = current_addr;
1769                                        }
1770                                    }
1771                                }
1772                            }
1773                            // G5: ProcessPlugin re-injects the cached input
1774                            // array (C++ writeInt32 NDPluginDriver.cpp:739-746).
1775                            if reason == process_plugin_reason && value.as_i32() != 0 {
1776                                let (process_output, senders, port) = {
1777                                    let mut guard = shared.lock();
1778                                    let output = guard.process_plugin();
1779                                    let senders = guard.output.lock().senders_clone();
1780                                    let port = guard.port_handle.clone();
1781                                    (output, senders, port)
1782                                };
1783                                if let Some(po) = process_output {
1784                                    po.publish_arrays(&senders).await;
1785                                    po.batch.flush(&port).await;
1786                                } else {
1787                                    eprintln!(
1788                                        "plugin {sender_port_name}: ProcessPlugin \
1789                                         requested but no input array cached"
1790                                    );
1791                                }
1792                            }
1793                            // B12: a control-plane write of ArrayCounter resets
1794                            // the working counter (C++ keeps NDArrayCounter in
1795                            // the param library; beginProcessCallbacks reads it).
1796                            if reason == array_counter_reason {
1797                                shared.lock().array_counter = value.as_i32();
1798                            }
1799                            // Handle sort param changes
1800                            if reason == sort_mode_reason {
1801                                let mode = value.as_i32();
1802                                // Scope the guard so clippy can verify the lock
1803                                // is released before any await.
1804                                let flush_work = {
1805                                    let mut guard = shared.lock();
1806                                    guard.sort_mode = mode;
1807                                    if mode == 0 {
1808                                        let output = guard.flush_sort_buffer();
1809                                        let senders = guard.output.lock().senders_clone();
1810                                        let port = guard.port_handle.clone();
1811                                        sort_flush_active = false;
1812                                        Some((output, senders, port))
1813                                    } else {
1814                                        sort_flush_active = guard.sort_time > 0.0;
1815                                        if sort_flush_active {
1816                                            let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1817                                            sort_flush_interval = tokio::time::interval(dur);
1818                                        }
1819                                        None
1820                                    }
1821                                };
1822                                if let Some((output, senders, port)) = flush_work {
1823                                    output.publish_arrays(&senders).await;
1824                                    output.batch.flush(&port).await;
1825                                }
1826                            }
1827                            if reason == sort_time_reason {
1828                                let t = value.as_f64();
1829                                let mut guard = shared.lock();
1830                                guard.sort_time = t;
1831                                if guard.sort_mode != 0 && t > 0.0 {
1832                                    sort_flush_active = true;
1833                                    let dur = std::time::Duration::from_secs_f64(t);
1834                                    sort_flush_interval = tokio::time::interval(dur);
1835                                } else {
1836                                    sort_flush_active = false;
1837                                }
1838                                drop(guard);
1839                            }
1840                            if reason == sort_size_reason {
1841                                shared.lock().sort_size = value.as_i32();
1842                            }
1843                            // Handle NDArrayPort rewiring — keyed by (port, addr).
1844                            if reason == nd_array_port_reason {
1845                                if let Some(new_port) = value.as_string() {
1846                                    if new_port != current_upstream {
1847                                        let old_key =
1848                                            upstream_key(&current_upstream, current_addr);
1849                                        let new_key = upstream_key(new_port, current_addr);
1850                                        match wiring.rewire_by_name(
1851                                            &sender_port_name,
1852                                            &old_key,
1853                                            &new_key,
1854                                        ) {
1855                                            Ok(()) => current_upstream = new_port.to_string(),
1856                                            Err(e) => {
1857                                                eprintln!("NDArrayPort rewire failed: {e}")
1858                                            }
1859                                        }
1860                                    }
1861                                }
1862                            }
1863                            let snapshot = PluginParamSnapshot {
1864                                enable_callbacks: enabled.load(Ordering::Acquire),
1865                                reason,
1866                                addr,
1867                                value,
1868                            };
1869                            let (process_output, senders, port) = {
1870                                let mut guard = shared.lock();
1871                                let t0 = std::time::Instant::now();
1872                                let result = guard.processor.on_param_change(reason, &snapshot);
1873                                let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1874                                let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1875                                    Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
1876                                } else {
1877                                    None
1878                                };
1879                                let senders = guard.output.lock().senders_clone();
1880                                (output, senders, guard.port_handle.clone())
1881                            };
1882                            if let Some(po) = process_output {
1883                                po.publish_arrays(&senders).await;
1884                                po.batch.flush(&port).await;
1885                            }
1886                        }
1887                        None => break,
1888                    }
1889                }
1890                _ = sort_flush_interval.tick(), if sort_flush_active => {
1891                    // B3: drain head-first while contiguous or past the
1892                    // staleness deadline — NOT the whole buffer.
1893                    let (output, senders, port) = {
1894                        let mut guard = shared.lock();
1895                        let output = guard.tick_sort_buffer();
1896                        let senders = guard.output.lock().senders_clone();
1897                        let port = guard.port_handle.clone();
1898                        (output, senders, port)
1899                    };
1900                    output.publish_arrays(&senders).await;
1901                    output.batch.flush(&port).await;
1902                }
1903            }
1904        }
1905    });
1906}
1907
1908/// Connect a downstream plugin's sender to a plugin runtime's output.
1909///
1910/// B13: the upstream's `array_output` is the same `Arc` that every
1911/// `create_plugin_runtime*` entry point registers in the `WiringRegistry`, so
1912/// adding a sender here mutates the registry-tracked output — the registry
1913/// remains the single source of truth for `rewire_by_name`.
1914pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1915    upstream.array_output().lock().add(downstream_sender);
1916}
1917
1918/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
1919pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1920    port_name: &str,
1921    mut processor: P,
1922    pool: Arc<NDArrayPool>,
1923    queue_size: usize,
1924    output: NDArrayOutput,
1925    ndarray_port: &str,
1926    wiring: Arc<WiringRegistry>,
1927) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1928    // B14: unbounded so control-plane param changes (e.g. autosave restoring
1929    // hundreds of PVs at IOC init) are never silently dropped before the
1930    // data plane sees them.
1931    let (param_tx, param_rx) =
1932        tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
1933
1934    let plugin_type_name = processor.plugin_type().to_string();
1935    let compression_aware = processor.compression_aware();
1936    let array_data = processor.array_data_handle();
1937    let driver = PluginPortDriver::new(
1938        port_name,
1939        &plugin_type_name,
1940        queue_size,
1941        ndarray_port,
1942        1,
1943        param_tx,
1944        &mut processor,
1945        array_data,
1946    )
1947    .expect("failed to create plugin port driver");
1948
1949    let ndarray_params = driver.ndarray_params;
1950    let plugin_params = driver.plugin_params;
1951    let std_array_data_param = driver.std_array_data_param;
1952
1953    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1954
1955    let port_handle = port_runtime.port_handle().clone();
1956
1957    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1958
1959    let enabled = Arc::new(AtomicBool::new(false));
1960    let blocking_mode = Arc::new(AtomicBool::new(false));
1961
1962    let array_output = Arc::new(parking_lot::Mutex::new(output));
1963    let array_output_for_handle = array_output.clone();
1964    // B13: register this plugin's output so the WiringRegistry is the single
1965    // source of truth — an output created via this entry point is otherwise
1966    // invisible to runtime rewiring.
1967    wiring.register_output(port_name, array_output.clone());
1968    // G1/B1: DroppedArrays counter shared with upstream senders.
1969    let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
1970    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1971        processor,
1972        output: array_output,
1973        pool,
1974        ndarray_params,
1975        plugin_params,
1976        port_handle,
1977        array_counter: 0,
1978        std_array_data_param,
1979        min_callback_time: 0.0,
1980        last_process_time: None,
1981        sort_mode: 0,
1982        sort_time: 0.0,
1983        sort_size: 10,
1984        sort_buffer: SortBuffer::new(),
1985        dropped_arrays: dropped_arrays_counter,
1986        compression_aware,
1987        max_byte_rate: 0.0,
1988        throttler: super::throttler::Throttler::new(0.0),
1989        prev_input_array: None,
1990        dims_prev: Vec::new(),
1991        nd_array_addr: 0,
1992        max_threads: 1,
1993        num_threads: 1,
1994    }));
1995
1996    let data_enabled = enabled.clone();
1997    let data_blocking = blocking_mode.clone();
1998
1999    let mut array_sender = array_sender;
2000    array_sender.set_mode_flags(enabled, blocking_mode);
2001
2002    // Capture wiring info for data loop
2003    let sender_port_name = port_name.to_string();
2004    let initial_upstream = ndarray_port.to_string();
2005
2006    let data_jh = thread::Builder::new()
2007        .name(format!("plugin-data-{port_name}"))
2008        .spawn(move || {
2009            plugin_data_loop(
2010                shared,
2011                array_rx,
2012                param_rx,
2013                plugin_params,
2014                ndarray_params.array_counter,
2015                data_enabled,
2016                data_blocking,
2017                sender_port_name,
2018                initial_upstream,
2019                wiring,
2020            );
2021        })
2022        .expect("failed to spawn plugin data thread");
2023
2024    let handle = PluginRuntimeHandle {
2025        port_runtime,
2026        array_sender,
2027        array_output: array_output_for_handle,
2028        port_name: port_name.to_string(),
2029        ndarray_params,
2030        plugin_params,
2031    };
2032
2033    (handle, data_jh)
2034}
2035
2036#[cfg(test)]
2037mod tests {
2038    use super::*;
2039    use crate::ndarray::{NDDataType, NDDimension};
2040    use crate::plugin::channel::ndarray_channel;
2041
2042    /// Passthrough processor: returns the input array as-is.
2043    struct PassthroughProcessor;
2044
2045    impl NDPluginProcess for PassthroughProcessor {
2046        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2047            ProcessResult::arrays(vec![Arc::new(array.clone())])
2048        }
2049        fn plugin_type(&self) -> &str {
2050            "Passthrough"
2051        }
2052    }
2053
2054    /// Sink processor: consumes arrays, returns nothing.
2055    struct SinkProcessor {
2056        count: usize,
2057    }
2058
2059    impl NDPluginProcess for SinkProcessor {
2060        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2061            self.count += 1;
2062            ProcessResult::empty()
2063        }
2064        fn plugin_type(&self) -> &str {
2065            "Sink"
2066        }
2067    }
2068
2069    fn make_test_array(id: i32) -> Arc<NDArray> {
2070        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
2071        arr.unique_id = id;
2072        Arc::new(arr)
2073    }
2074
2075    fn test_wiring() -> Arc<WiringRegistry> {
2076        Arc::new(WiringRegistry::new())
2077    }
2078
2079    /// Enable callbacks on a plugin handle (plugins default to disabled).
2080    fn enable_callbacks(handle: &PluginRuntimeHandle) {
2081        handle
2082            .port_runtime()
2083            .port_handle()
2084            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
2085            .unwrap();
2086        std::thread::sleep(std::time::Duration::from_millis(10));
2087    }
2088
2089    /// Send an array via the sender from a sync test context.
2090    /// Uses a dedicated thread with a current-thread runtime to avoid
2091    /// interfering with the plugin's own runtime.
2092    fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
2093        let sender = sender.clone();
2094        let jh = std::thread::spawn(move || {
2095            let rt = tokio::runtime::Builder::new_current_thread()
2096                .enable_all()
2097                .build()
2098                .unwrap();
2099            rt.block_on(sender.publish(array));
2100        });
2101        jh.join().unwrap();
2102    }
2103
2104    #[test]
2105    fn test_passthrough_runtime() {
2106        let pool = Arc::new(NDArrayPool::new(1_000_000));
2107
2108        // Create downstream receiver
2109        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2110        let mut output = NDArrayOutput::new();
2111        output.add(downstream_sender);
2112
2113        let (handle, _data_jh) = create_plugin_runtime_with_output(
2114            "PASS1",
2115            PassthroughProcessor,
2116            pool,
2117            10,
2118            output,
2119            "",
2120            test_wiring(),
2121        );
2122        enable_callbacks(&handle);
2123
2124        // Send an array
2125        send_array(handle.array_sender(), make_test_array(42));
2126
2127        // Should come out the other side
2128        let received = downstream_rx.blocking_recv().unwrap();
2129        assert_eq!(received.unique_id, 42);
2130    }
2131
2132    #[test]
2133    fn test_sink_runtime() {
2134        let pool = Arc::new(NDArrayPool::new(1_000_000));
2135
2136        let (handle, _data_jh) = create_plugin_runtime(
2137            "SINK1",
2138            SinkProcessor { count: 0 },
2139            pool,
2140            10,
2141            "",
2142            test_wiring(),
2143        );
2144        enable_callbacks(&handle);
2145
2146        // Send arrays - they should be consumed silently
2147        send_array(handle.array_sender(), make_test_array(1));
2148        send_array(handle.array_sender(), make_test_array(2));
2149
2150        // Give processing thread time
2151        std::thread::sleep(std::time::Duration::from_millis(50));
2152
2153        // No crash, no output needed
2154        assert_eq!(handle.port_name(), "SINK1");
2155    }
2156
2157    #[test]
2158    fn test_plugin_type_param() {
2159        let pool = Arc::new(NDArrayPool::new(1_000_000));
2160
2161        let (handle, _data_jh) = create_plugin_runtime(
2162            "TYPE_TEST",
2163            PassthroughProcessor,
2164            pool,
2165            10,
2166            "",
2167            test_wiring(),
2168        );
2169
2170        // Verify port name
2171        assert_eq!(handle.port_name(), "TYPE_TEST");
2172        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
2173    }
2174
2175    #[test]
2176    fn test_shutdown_on_handle_drop() {
2177        let pool = Arc::new(NDArrayPool::new(1_000_000));
2178
2179        let (handle, data_jh) = create_plugin_runtime(
2180            "SHUTDOWN_TEST",
2181            PassthroughProcessor,
2182            pool,
2183            10,
2184            "",
2185            test_wiring(),
2186        );
2187
2188        // Drop the handle (closes sender channel, which should cause data thread to exit)
2189        let sender = handle.array_sender().clone();
2190        drop(handle);
2191        drop(sender);
2192
2193        // Data thread should terminate
2194        let result = data_jh.join();
2195        assert!(result.is_ok());
2196    }
2197
2198    #[test]
2199    fn test_wire_to_nonzero_ndarray_addr() {
2200        // G6: a multi-address upstream plugin registers its output under every
2201        // address in 0..max_addr. A downstream consumer must be able to select
2202        // NDArrayAddr=1 and actually receive arrays — previously the output was
2203        // registered under the bare port name only, so the "PORT:1" key was
2204        // missing and rewire failed with "not found".
2205        use crate::plugin::wiring::upstream_key;
2206        let pool = Arc::new(NDArrayPool::new(1_000_000));
2207        let wiring = test_wiring();
2208
2209        // Upstream plugin advertises 2 addresses.
2210        let (up_handle, _up_jh) = create_plugin_runtime_multi_addr(
2211            "UP_MULTI",
2212            PassthroughProcessor,
2213            pool,
2214            10,
2215            "",
2216            wiring.clone(),
2217            2,
2218        );
2219        enable_callbacks(&up_handle);
2220
2221        // The "PORT:1" key must resolve to the same output as the bare port.
2222        let addr0 = wiring.lookup_output("UP_MULTI");
2223        let addr1 = wiring.lookup_output(&upstream_key("UP_MULTI", 1));
2224        assert!(addr0.is_some(), "addr 0 output must be registered");
2225        assert!(
2226            addr1.is_some(),
2227            "addr 1 output must be registered for a max_addr=2 port"
2228        );
2229
2230        // Wire a downstream consumer to UP_MULTI address 1.
2231        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWN_ADDR1", 10);
2232        wiring
2233            .rewire(&downstream_sender, "", &upstream_key("UP_MULTI", 1))
2234            .expect("wiring a consumer to NDArrayAddr=1 must succeed");
2235
2236        // An array sent through the upstream must reach the addr-1 consumer.
2237        send_array(up_handle.array_sender(), make_test_array(99));
2238        let received = downstream_rx.blocking_recv().unwrap();
2239        assert_eq!(
2240            received.unique_id, 99,
2241            "consumer wired to NDArrayAddr=1 must receive upstream arrays"
2242        );
2243    }
2244
2245    #[test]
2246    fn test_nonblocking_passthrough() {
2247        let pool = Arc::new(NDArrayPool::new(1_000_000));
2248        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2249        let mut output = NDArrayOutput::new();
2250        output.add(downstream_sender);
2251
2252        let (handle, _data_jh) = create_plugin_runtime_with_output(
2253            "NB_TEST",
2254            PassthroughProcessor,
2255            pool,
2256            10,
2257            output,
2258            "",
2259            test_wiring(),
2260        );
2261        enable_callbacks(&handle);
2262
2263        send_array(handle.array_sender(), make_test_array(42));
2264
2265        let received = downstream_rx.blocking_recv().unwrap();
2266        assert_eq!(received.unique_id, 42);
2267    }
2268
2269    #[test]
2270    fn test_blocking_to_nonblocking_switch() {
2271        let pool = Arc::new(NDArrayPool::new(1_000_000));
2272        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2273        let mut output = NDArrayOutput::new();
2274        output.add(downstream_sender);
2275
2276        let (handle, _data_jh) = create_plugin_runtime_with_output(
2277            "SWITCH_TEST",
2278            PassthroughProcessor,
2279            pool,
2280            10,
2281            output,
2282            "",
2283            test_wiring(),
2284        );
2285        enable_callbacks(&handle);
2286
2287        // Start in blocking mode
2288        handle
2289            .port_runtime()
2290            .port_handle()
2291            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
2292            .unwrap();
2293        std::thread::sleep(std::time::Duration::from_millis(50));
2294
2295        send_array(handle.array_sender(), make_test_array(1));
2296        let received = downstream_rx.blocking_recv().unwrap();
2297        assert_eq!(received.unique_id, 1);
2298
2299        // Switch back to non-blocking
2300        handle
2301            .port_runtime()
2302            .port_handle()
2303            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
2304            .unwrap();
2305        std::thread::sleep(std::time::Duration::from_millis(50));
2306
2307        // Send in non-blocking mode — goes through channel to data thread
2308        send_array(handle.array_sender(), make_test_array(2));
2309        let received = downstream_rx.blocking_recv().unwrap();
2310        assert_eq!(received.unique_id, 2);
2311    }
2312
2313    #[test]
2314    fn test_enable_callbacks_disables_processing() {
2315        let pool = Arc::new(NDArrayPool::new(1_000_000));
2316        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2317        let mut output = NDArrayOutput::new();
2318        output.add(downstream_sender);
2319
2320        let (handle, _data_jh) = create_plugin_runtime_with_output(
2321            "ENABLE_TEST",
2322            PassthroughProcessor,
2323            pool,
2324            10,
2325            output,
2326            "",
2327            test_wiring(),
2328        );
2329
2330        // Disable callbacks
2331        handle
2332            .port_runtime()
2333            .port_handle()
2334            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
2335            .unwrap();
2336        std::thread::sleep(std::time::Duration::from_millis(50));
2337
2338        // Send array — should be silently dropped by sender (callbacks disabled)
2339        send_array(handle.array_sender(), make_test_array(99));
2340
2341        // Verify nothing received (with timeout)
2342        let rt = tokio::runtime::Builder::new_current_thread()
2343            .enable_all()
2344            .build()
2345            .unwrap();
2346        let result = rt.block_on(async {
2347            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
2348        });
2349        assert!(
2350            result.is_err(),
2351            "should not receive array when callbacks disabled"
2352        );
2353    }
2354
2355    #[test]
2356    fn test_downstream_receives_multiple() {
2357        let pool = Arc::new(NDArrayPool::new(1_000_000));
2358
2359        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
2360        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
2361        let mut output = NDArrayOutput::new();
2362        output.add(ds1);
2363        output.add(ds2);
2364
2365        let (handle, _data_jh) = create_plugin_runtime_with_output(
2366            "DS_TEST",
2367            PassthroughProcessor,
2368            pool,
2369            10,
2370            output,
2371            "",
2372            test_wiring(),
2373        );
2374        enable_callbacks(&handle);
2375
2376        send_array(handle.array_sender(), make_test_array(77));
2377
2378        // Both downstream receivers should have the array
2379        let r1 = rx1.blocking_recv().unwrap();
2380        let r2 = rx2.blocking_recv().unwrap();
2381        assert_eq!(r1.unique_id, 77);
2382        assert_eq!(r2.unique_id, 77);
2383    }
2384
2385    #[test]
2386    fn test_param_updates_after_send() {
2387        let pool = Arc::new(NDArrayPool::new(1_000_000));
2388
2389        struct ParamTracker;
2390        impl NDPluginProcess for ParamTracker {
2391            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2392                ProcessResult::arrays(vec![Arc::new(array.clone())])
2393            }
2394            fn plugin_type(&self) -> &str {
2395                "ParamTracker"
2396            }
2397        }
2398
2399        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2400        let mut output = NDArrayOutput::new();
2401        output.add(downstream_sender);
2402
2403        let (handle, _data_jh) = create_plugin_runtime_with_output(
2404            "PARAM_TEST",
2405            ParamTracker,
2406            pool,
2407            10,
2408            output,
2409            "",
2410            test_wiring(),
2411        );
2412        enable_callbacks(&handle);
2413
2414        // Send array
2415        send_array(handle.array_sender(), make_test_array(1));
2416        let received = downstream_rx.blocking_recv().unwrap();
2417        assert_eq!(received.unique_id, 1);
2418
2419        // Write enable_callbacks — should not crash
2420        handle
2421            .port_runtime()
2422            .port_handle()
2423            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
2424            .unwrap();
2425        std::thread::sleep(std::time::Duration::from_millis(50));
2426
2427        // Still works after param update
2428        send_array(handle.array_sender(), make_test_array(2));
2429        let received = downstream_rx.blocking_recv().unwrap();
2430        assert_eq!(received.unique_id, 2);
2431    }
2432
2433    #[test]
2434    fn test_sort_buffer_reorders_by_unique_id() {
2435        let mut buf = SortBuffer::new();
2436
2437        // Insert out of order: 3, 1, 2
2438        buf.insert(3, vec![make_test_array(3)], 10);
2439        buf.insert(1, vec![make_test_array(1)], 10);
2440        buf.insert(2, vec![make_test_array(2)], 10);
2441
2442        assert_eq!(buf.len(), 3);
2443
2444        let drained = buf.drain_all();
2445        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2446        assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
2447        assert_eq!(buf.len(), 0);
2448        assert_eq!(buf.prev_unique_id, 3);
2449    }
2450
2451    #[test]
2452    fn test_sort_buffer_drain_ready_contiguous() {
2453        // B3: drain_ready releases the head while the next-expected uniqueId
2454        // is contiguous, even when later ids are still missing.
2455        let mut buf = SortBuffer::new();
2456        // Mark a prior emission (prev=0) so the contiguity path is active;
2457        // C++ only uses the deadline for the very first output array.
2458        buf.note_emitted(0);
2459        buf.insert(1, vec![make_test_array(1)], 10);
2460        buf.insert(2, vec![make_test_array(2)], 10);
2461        buf.insert(5, vec![make_test_array(5)], 10); // gap: 3,4 missing
2462
2463        // sort_time large → only contiguity drives release.
2464        let drained = buf.drain_ready(100.0);
2465        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2466        assert_eq!(ids, vec![1, 2], "contiguous run released; id=5 held by gap");
2467        assert_eq!(buf.len(), 1);
2468    }
2469
2470    #[test]
2471    fn test_sort_buffer_drain_ready_deadline() {
2472        // B3: a stale head is released past sort_time even with a gap.
2473        let mut buf = SortBuffer::new();
2474        buf.note_emitted(1); // prev=1
2475        buf.insert(5, vec![make_test_array(5)], 10); // out of order
2476        std::thread::sleep(std::time::Duration::from_millis(30));
2477        // sort_time=0.01s → head aged past deadline → released.
2478        let drained = buf.drain_ready(0.01);
2479        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2480        assert_eq!(ids, vec![5], "stale head released via deadline");
2481    }
2482
2483    #[test]
2484    fn test_sort_buffer_detects_disordered_on_emit() {
2485        // B4: disorder is counted at emission time.
2486        let mut buf = SortBuffer::new();
2487        buf.note_emitted(5); // prev=5, first_output now false
2488        buf.note_emitted(3); // 3 != 5 and != 6 → disordered
2489        assert_eq!(buf.disordered_arrays, 1);
2490        buf.note_emitted(4); // 4 != 3 and != 4? 4 == prev+1 → ordered
2491        assert_eq!(buf.disordered_arrays, 1);
2492    }
2493
2494    #[test]
2495    fn test_sort_buffer_drops_when_full() {
2496        let mut buf = SortBuffer::new();
2497
2498        // sort_size=2: third insert is refused.
2499        assert!(buf.insert(1, vec![make_test_array(1)], 2));
2500        assert!(buf.insert(2, vec![make_test_array(2)], 2));
2501        assert!(!buf.insert(3, vec![make_test_array(3)], 2));
2502
2503        assert_eq!(buf.len(), 2);
2504        assert_eq!(buf.dropped_output_arrays, 1);
2505    }
2506
2507    #[test]
2508    fn test_sort_mode_runtime_integration() {
2509        let pool = Arc::new(NDArrayPool::new(1_000_000));
2510        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2511        let mut output = NDArrayOutput::new();
2512        output.add(downstream_sender);
2513
2514        let (handle, _data_jh) = create_plugin_runtime_with_output(
2515            "SORT_TEST",
2516            PassthroughProcessor,
2517            pool,
2518            10,
2519            output,
2520            "",
2521            test_wiring(),
2522        );
2523        enable_callbacks(&handle);
2524
2525        // Enable sort mode with sort_size=10 and a sort_time deadline.
2526        handle
2527            .port_runtime()
2528            .port_handle()
2529            .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
2530            .unwrap();
2531        handle
2532            .port_runtime()
2533            .port_handle()
2534            .write_float64_blocking(handle.plugin_params.sort_time, 0, 0.1)
2535            .unwrap();
2536        handle
2537            .port_runtime()
2538            .port_handle()
2539            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
2540            .unwrap();
2541        std::thread::sleep(std::time::Duration::from_millis(50));
2542
2543        // B2: in-order arrays (1,2,3) must be emitted IMMEDIATELY via the
2544        // fast path — they are NOT delayed by the sort buffer.
2545        send_array(handle.array_sender(), make_test_array(1));
2546        send_array(handle.array_sender(), make_test_array(2));
2547        send_array(handle.array_sender(), make_test_array(3));
2548
2549        let rt = tokio::runtime::Builder::new_current_thread()
2550            .enable_all()
2551            .build()
2552            .unwrap();
2553        let fast = rt.block_on(async {
2554            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
2555        });
2556        assert!(
2557            fast.is_ok(),
2558            "in-order arrays must be emitted immediately, not buffered"
2559        );
2560        assert_eq!(fast.unwrap().unwrap().unique_id, 1);
2561        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 2);
2562        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 3);
2563
2564        // B3: now send out of order (5 before 4). prev=3, so 4 is in-order
2565        // and emitted immediately; 5 arrives first, is buffered, then 4
2566        // unblocks it.
2567        send_array(handle.array_sender(), make_test_array(5));
2568        send_array(handle.array_sender(), make_test_array(4));
2569        std::thread::sleep(std::time::Duration::from_millis(50));
2570        // 4 emitted immediately (in order), then 5 released by contiguity.
2571        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 4);
2572        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 5);
2573    }
2574
2575    #[test]
2576    fn test_throttle_drops_output_arrays() {
2577        // G7: with a tiny MaxByteRate, output arrays exceeding the byte budget
2578        // are dropped and counted into DroppedOutputArrays.
2579        let pool = Arc::new(NDArrayPool::new(1_000_000));
2580        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2581        let mut output = NDArrayOutput::new();
2582        output.add(downstream_sender);
2583
2584        let (handle, _data_jh) = create_plugin_runtime_with_output(
2585            "THROTTLE_TEST",
2586            PassthroughProcessor,
2587            pool,
2588            10,
2589            output,
2590            "",
2591            test_wiring(),
2592        );
2593        enable_callbacks(&handle);
2594
2595        // MaxByteRate = 8 bytes/sec. Each test array is 4 bytes; the bucket
2596        // starts full at 8, so the first two pass and the rest are dropped.
2597        handle
2598            .port_runtime()
2599            .port_handle()
2600            .write_float64_blocking(handle.plugin_params.max_byte_rate, 0, 8.0)
2601            .unwrap();
2602        std::thread::sleep(std::time::Duration::from_millis(20));
2603
2604        for id in 1..=5 {
2605            send_array(handle.array_sender(), make_test_array(id));
2606        }
2607        std::thread::sleep(std::time::Duration::from_millis(50));
2608
2609        // Drain whatever made it through — strictly fewer than 5.
2610        let rt = tokio::runtime::Builder::new_current_thread()
2611            .enable_all()
2612            .build()
2613            .unwrap();
2614        let mut received = 0;
2615        while rt
2616            .block_on(async {
2617                tokio::time::timeout(std::time::Duration::from_millis(20), downstream_rx.recv())
2618                    .await
2619            })
2620            .map(|o| o.is_some())
2621            .unwrap_or(false)
2622        {
2623            received += 1;
2624        }
2625        assert!(
2626            received < 5,
2627            "throttle must drop some arrays (got {received})"
2628        );
2629        assert!(received >= 1, "first array within budget must pass");
2630    }
2631
2632    #[test]
2633    fn test_process_plugin_reprocesses_last_input() {
2634        // G5: writing ProcessPlugin re-injects the cached last input array.
2635        let pool = Arc::new(NDArrayPool::new(1_000_000));
2636        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2637        let mut output = NDArrayOutput::new();
2638        output.add(downstream_sender);
2639
2640        let (handle, _data_jh) = create_plugin_runtime_with_output(
2641            "PROCESS_PLUGIN_TEST",
2642            PassthroughProcessor,
2643            pool,
2644            10,
2645            output,
2646            "",
2647            test_wiring(),
2648        );
2649        enable_callbacks(&handle);
2650
2651        send_array(handle.array_sender(), make_test_array(7));
2652        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 7);
2653
2654        // Trigger ProcessPlugin — the cached input (id=7) is reprocessed.
2655        handle
2656            .port_runtime()
2657            .port_handle()
2658            .write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
2659            .unwrap();
2660        let reprocessed = downstream_rx.blocking_recv().unwrap();
2661        assert_eq!(
2662            reprocessed.unique_id, 7,
2663            "ProcessPlugin re-emits last input"
2664        );
2665    }
2666
2667    #[test]
2668    fn test_min_callback_time_drop_counts() {
2669        // B5: a MinCallbackTime-throttled array is dropped — verify the data
2670        // loop does not emit it (silent loss is the bug being fixed; the
2671        // DroppedArrays param increment is covered by the integration tests).
2672        let pool = Arc::new(NDArrayPool::new(1_000_000));
2673        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2674        let mut output = NDArrayOutput::new();
2675        output.add(downstream_sender);
2676
2677        let (handle, _data_jh) = create_plugin_runtime_with_output(
2678            "MIN_CB_TEST",
2679            PassthroughProcessor,
2680            pool,
2681            10,
2682            output,
2683            "",
2684            test_wiring(),
2685        );
2686        enable_callbacks(&handle);
2687
2688        // 10s minimum between callbacks — only the first array gets through.
2689        handle
2690            .port_runtime()
2691            .port_handle()
2692            .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
2693            .unwrap();
2694        std::thread::sleep(std::time::Duration::from_millis(20));
2695
2696        send_array(handle.array_sender(), make_test_array(1));
2697        send_array(handle.array_sender(), make_test_array(2));
2698        std::thread::sleep(std::time::Duration::from_millis(50));
2699
2700        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
2701        let rt = tokio::runtime::Builder::new_current_thread()
2702            .enable_all()
2703            .build()
2704            .unwrap();
2705        let second = rt.block_on(async {
2706            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
2707        });
2708        assert!(
2709            second.is_err(),
2710            "second array throttled out by MinCallbackTime"
2711        );
2712    }
2713
2714    #[test]
2715    fn test_process_plugin_skips_throttled_input() {
2716        // R2: a MinCallbackTime-throttled frame must NOT be cached as the
2717        // ProcessPlugin input. After array 1 is processed and array 2 is
2718        // dropped by the throttle, ProcessPlugin must re-inject array 1
2719        // (the last *processed* array), not the dropped array 2.
2720        let pool = Arc::new(NDArrayPool::new(1_000_000));
2721        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2722        let mut output = NDArrayOutput::new();
2723        output.add(downstream_sender);
2724
2725        let (handle, _data_jh) = create_plugin_runtime_with_output(
2726            "PROCESS_THROTTLE_TEST",
2727            PassthroughProcessor,
2728            pool,
2729            10,
2730            output,
2731            "",
2732            test_wiring(),
2733        );
2734        enable_callbacks(&handle);
2735
2736        // 10s minimum between callbacks — only the first array is processed.
2737        handle
2738            .port_runtime()
2739            .port_handle()
2740            .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
2741            .unwrap();
2742        std::thread::sleep(std::time::Duration::from_millis(20));
2743
2744        send_array(handle.array_sender(), make_test_array(1));
2745        send_array(handle.array_sender(), make_test_array(2));
2746        std::thread::sleep(std::time::Duration::from_millis(50));
2747
2748        // Array 1 was processed and emitted; array 2 was throttled out.
2749        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
2750
2751        // ProcessPlugin re-injects the cached input. The cache must still hold
2752        // array 1, because array 2 never passed the throttle gate. The
2753        // re-injected array itself is also subject to the throttle, so reset
2754        // MinCallbackTime to 0 first so the re-injected frame is processed.
2755        handle
2756            .port_runtime()
2757            .port_handle()
2758            .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 0.0)
2759            .unwrap();
2760        std::thread::sleep(std::time::Duration::from_millis(20));
2761        handle
2762            .port_runtime()
2763            .port_handle()
2764            .write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
2765            .unwrap();
2766        let reprocessed = downstream_rx.blocking_recv().unwrap();
2767        assert_eq!(
2768            reprocessed.unique_id, 1,
2769            "ProcessPlugin must re-inject the last processed array (1), not the throttled array (2)"
2770        );
2771    }
2772
2773    #[test]
2774    fn test_g3_compressed_array_dropped_on_non_aware_plugin() {
2775        // G3: a non-compression-aware plugin drops a compressed array.
2776        let pool = Arc::new(NDArrayPool::new(1_000_000));
2777        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2778        let mut output = NDArrayOutput::new();
2779        output.add(downstream_sender);
2780
2781        let (handle, _data_jh) = create_plugin_runtime_with_output(
2782            "G3_TEST",
2783            PassthroughProcessor, // compression_aware() defaults to false
2784            pool,
2785            10,
2786            output,
2787            "",
2788            test_wiring(),
2789        );
2790        enable_callbacks(&handle);
2791
2792        // A compressed array must be dropped, not forwarded.
2793        let mut compressed = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
2794        compressed.unique_id = 1;
2795        compressed.codec = Some(crate::codec::Codec {
2796            name: crate::codec::CodecName::JPEG,
2797            compressed_size: 16,
2798            level: 0,
2799            shuffle: 0,
2800            compressor: 0,
2801        });
2802        send_array(handle.array_sender(), Arc::new(compressed));
2803
2804        // An uncompressed array passes through normally.
2805        send_array(handle.array_sender(), make_test_array(2));
2806
2807        let r = downstream_rx.blocking_recv().unwrap();
2808        assert_eq!(
2809            r.unique_id, 2,
2810            "compressed array dropped; only the raw array reaches downstream"
2811        );
2812    }
2813
2814    #[test]
2815    fn test_drop_on_full_increments_dropped_counter() {
2816        // B1/G1: a slow downstream plugin with a tiny input queue drops arrays
2817        // when the queue is full; the drop is counted in the plugin's shared
2818        // DroppedArrays counter rather than back-pressuring the producer.
2819        struct SlowProcessor;
2820        impl NDPluginProcess for SlowProcessor {
2821            fn process_array(&mut self, _a: &NDArray, _p: &NDArrayPool) -> ProcessResult {
2822                std::thread::sleep(std::time::Duration::from_millis(200));
2823                ProcessResult::empty()
2824            }
2825            fn plugin_type(&self) -> &str {
2826                "Slow"
2827            }
2828        }
2829        let pool = Arc::new(NDArrayPool::new(1_000_000));
2830
2831        // Downstream plugin with queue size 1 and a slow processor.
2832        let (downstream_handle, _ds_jh) =
2833            create_plugin_runtime("B1_DOWNSTREAM", SlowProcessor, pool, 1, "", test_wiring());
2834        enable_callbacks(&downstream_handle);
2835        let ds_sender = downstream_handle.array_sender().clone();
2836        let dropped = ds_sender.dropped_arrays_counter().clone();
2837
2838        // First array is taken by the data loop (now sleeping 200ms); second
2839        // fills the 1-slot queue; the rest find a full queue → dropped.
2840        send_array(&ds_sender, make_test_array(1));
2841        send_array(&ds_sender, make_test_array(2));
2842        send_array(&ds_sender, make_test_array(3));
2843        send_array(&ds_sender, make_test_array(4));
2844
2845        assert!(
2846            dropped.load(Ordering::Acquire) >= 1,
2847            "arrays dropped on a full queue must be counted (got {})",
2848            dropped.load(Ordering::Acquire)
2849        );
2850    }
2851
2852    #[test]
2853    fn test_cross_width_narrowing_array_read_truncates() {
2854        // Cross-width integer narrowing array reads must TRUNCATE (wrapping),
2855        // matching the C cast in C++ NDArrayPool.cpp:388 `convertType`
2856        //   *pDataOut++ = (dataTypeOut)(*pDataIn++);
2857        // A C cast `(epicsInt8)(epicsUInt16)300` keeps the low 8 bits == 44.
2858        // The f64 round-trip in copy_convert would SATURATE (`300.0 as i8`
2859        // == 127) and diverge from C++ — copy_ccast must be used instead.
2860
2861        // U16 -> i8: 300 = 0x012C; low byte 0x2C = 44.
2862        let mut out = [0i8; 1];
2863        let n = copy_ccast(&[300u16], &mut out);
2864        assert_eq!(n, 1);
2865        assert_eq!(out[0], 44, "(epicsInt8)(epicsUInt16)300 == 44 (low 8 bits)");
2866        // copy_convert would have saturated:
2867        let mut sat = [0i8; 1];
2868        copy_convert(&[300u16], &mut sat);
2869        assert_eq!(sat[0], 127, "f64 round-trip saturates — the wrong behavior");
2870
2871        // I32 -> i8: 0x1234_5678 -> low byte 0x78 = 120.
2872        let mut out2 = [0i8; 1];
2873        copy_ccast(&[0x1234_5678i32], &mut out2);
2874        assert_eq!(out2[0], 0x78);
2875
2876        // I32 -> i8: -1 stays -1 (all-ones low byte).
2877        let mut out3 = [0i8; 1];
2878        copy_ccast(&[-1i32], &mut out3);
2879        assert_eq!(out3[0], -1);
2880
2881        // U16 -> i8: 0x00FF = 255 -> low byte 0xFF reinterpreted as i8 == -1.
2882        let mut out4 = [0i8; 1];
2883        copy_ccast(&[255u16], &mut out4);
2884        assert_eq!(out4[0], -1);
2885
2886        // I64 -> i32: 0x0000_0001_0000_002A -> low 32 bits == 42.
2887        let mut out5 = [0i32; 1];
2888        copy_ccast(&[0x0000_0001_0000_002Ai64], &mut out5);
2889        assert_eq!(out5[0], 42);
2890
2891        // U32 -> i16: 70000 = 0x0001_1170 -> low 16 bits 0x1170 == 4464.
2892        let mut out6 = [0i16; 1];
2893        copy_ccast(&[70000u32], &mut out6);
2894        assert_eq!(out6[0], 4464);
2895
2896        // Same-width sign change still works as a bitwise reinterpret:
2897        // U8 255 -> i8 -1.
2898        let mut out7 = [0i8; 1];
2899        copy_ccast(&[255u8], &mut out7);
2900        assert_eq!(out7[0], -1);
2901
2902        // F64 out-of-range -> i32 still routes through copy_convert (the
2903        // `convert:` arm for float sources). C++ converts float->int with a
2904        // C cast too, but the runtime keeps the f64 numeric path for float
2905        // sources; this asserts the integer-narrowing fix did not change the
2906        // float-source path.
2907        let mut fout = [0i32; 1];
2908        copy_convert(&[42.9f64], &mut fout);
2909        assert_eq!(fout[0], 42, "f64 -> i32 truncates toward zero");
2910    }
2911}