Skip to main content

ad_core_rs/plugin/
runtime.rs

1//! Plugin runtime: control plane (PortActor) + data plane (processing thread).
2//!
3//! # Single-threaded data plane (intentional, G4)
4//!
5//! C++ `NDPluginDriver` runs `numThreads` worker threads sharing one input
6//! queue (`createCallbackThreads`). The Rust port deliberately runs **exactly
7//! one** per-plugin data thread driving a `tokio::select!` loop. This is an
8//! intentional design choice: a single owner of the processing state removes
9//! the C++ worker-pool races (shared `prevUniqueId_`, sort-buffer contention)
10//! and keeps array ordering trivially correct. The `NUM_THREADS` / `MAX_THREADS`
11//! PVs are therefore not backed by a real worker pool — instead `NumThreads`
12//! is validated and clamped to `[1, MaxThreads]` on write and the clamped
13//! value is written back, so the PV is honest about the accepted value rather
14//! than silently inert.
15
16use std::collections::BTreeMap;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::thread;
20
21use asyn_rs::error::AsynResult;
22use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
23use asyn_rs::runtime::config::RuntimeConfig;
24use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
25use asyn_rs::user::AsynUser;
26
27use asyn_rs::port_handle::PortHandle;
28
29use crate::ndarray::NDArray;
30use crate::ndarray_pool::NDArrayPool;
31use crate::params::ndarray_driver::NDArrayDriverParams;
32
33use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
34use super::params::PluginBaseParams;
35use super::wiring::{WiringRegistry, upstream_key};
36
37/// Value sent through the param change channel from control plane to data plane.
38#[derive(Debug, Clone)]
39pub enum ParamChangeValue {
40    Int32(i32),
41    Float64(f64),
42    Octet(String),
43}
44
45impl ParamChangeValue {
46    pub fn as_i32(&self) -> i32 {
47        match self {
48            ParamChangeValue::Int32(v) => *v,
49            ParamChangeValue::Float64(v) => *v as i32,
50            ParamChangeValue::Octet(_) => 0,
51        }
52    }
53
54    pub fn as_f64(&self) -> f64 {
55        match self {
56            ParamChangeValue::Int32(v) => *v as f64,
57            ParamChangeValue::Float64(v) => *v,
58            ParamChangeValue::Octet(_) => 0.0,
59        }
60    }
61
62    pub fn as_string(&self) -> Option<&str> {
63        match self {
64            ParamChangeValue::Octet(s) => Some(s),
65            _ => None,
66        }
67    }
68}
69
70/// A single parameter update produced by a plugin's process_array.
71pub enum ParamUpdate {
72    Int32 {
73        reason: usize,
74        addr: i32,
75        value: i32,
76    },
77    Float64 {
78        reason: usize,
79        addr: i32,
80        value: f64,
81    },
82    Octet {
83        reason: usize,
84        addr: i32,
85        value: String,
86    },
87    Float64Array {
88        reason: usize,
89        addr: i32,
90        value: Vec<f64>,
91    },
92}
93
94impl ParamUpdate {
95    /// Create an Int32 update at addr 0.
96    pub fn int32(reason: usize, value: i32) -> Self {
97        Self::Int32 {
98            reason,
99            addr: 0,
100            value,
101        }
102    }
103    /// Create a Float64 update at addr 0.
104    pub fn float64(reason: usize, value: f64) -> Self {
105        Self::Float64 {
106            reason,
107            addr: 0,
108            value,
109        }
110    }
111    /// Create an Int32 update at a specific addr.
112    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
113        Self::Int32 {
114            reason,
115            addr,
116            value,
117        }
118    }
119    /// Create a Float64 update at a specific addr.
120    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
121        Self::Float64 {
122            reason,
123            addr,
124            value,
125        }
126    }
127    /// Create a Float64Array update at addr 0.
128    pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
129        Self::Float64Array {
130            reason,
131            addr: 0,
132            value,
133        }
134    }
135    /// Create a Float64Array update at a specific addr.
136    pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
137        Self::Float64Array {
138            reason,
139            addr,
140            value,
141        }
142    }
143    /// Create an Octet (string) update at addr 0.
144    pub fn octet(reason: usize, value: String) -> Self {
145        Self::Octet {
146            reason,
147            addr: 0,
148            value,
149        }
150    }
151    /// Create an Octet (string) update at a specific addr.
152    pub fn octet_addr(reason: usize, addr: i32, value: String) -> Self {
153        Self::Octet {
154            reason,
155            addr,
156            value,
157        }
158    }
159}
160
161/// Result of processing one array: output arrays + param updates to write back.
162pub struct ProcessResult {
163    pub output_arrays: Vec<Arc<NDArray>>,
164    pub param_updates: Vec<ParamUpdate>,
165    /// If set, only publish to the subscriber at this index (round-robin scatter).
166    pub scatter_index: Option<usize>,
167}
168
169impl ProcessResult {
170    /// Convenience: sink plugin with only param updates, no output arrays.
171    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
172        Self {
173            output_arrays: vec![],
174            param_updates,
175            scatter_index: None,
176        }
177    }
178
179    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
180    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
181        Self {
182            output_arrays,
183            param_updates: vec![],
184            scatter_index: None,
185        }
186    }
187
188    /// Convenience: no outputs, no param updates.
189    pub fn empty() -> Self {
190        Self {
191            output_arrays: vec![],
192            param_updates: vec![],
193            scatter_index: None,
194        }
195    }
196
197    /// Convenience: scatter output — send to a single subscriber by index.
198    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
199        Self {
200            output_arrays,
201            param_updates: vec![],
202            scatter_index: Some(index),
203        }
204    }
205}
206
207/// Result of handling a control-plane param change.
208pub struct ParamChangeResult {
209    pub output_arrays: Vec<Arc<NDArray>>,
210    pub param_updates: Vec<ParamUpdate>,
211}
212
213impl ParamChangeResult {
214    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
215        Self {
216            output_arrays: vec![],
217            param_updates,
218        }
219    }
220
221    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
222        Self {
223            output_arrays,
224            param_updates: vec![],
225        }
226    }
227
228    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
229        Self {
230            output_arrays,
231            param_updates,
232        }
233    }
234
235    pub fn empty() -> Self {
236        Self {
237            output_arrays: vec![],
238            param_updates: vec![],
239        }
240    }
241}
242
243/// Pure processing logic. No threading concerns.
244pub trait NDPluginProcess: Send + 'static {
245    /// Process one array. Return output arrays and param updates.
246    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
247
248    /// Plugin type name for PLUGIN_TYPE param.
249    fn plugin_type(&self) -> &str;
250
251    /// Whether this plugin can process compressed (`codec != None`) arrays
252    /// (C++ `compressionAware_`, G3). Defaults to `false`: a plugin that
253    /// operates on raw pixels must not be handed compressed bytes — the
254    /// runtime drops compressed input and counts it into DroppedArrays.
255    /// A codec/file plugin that understands compressed data overrides this.
256    fn compression_aware(&self) -> bool {
257        false
258    }
259
260    /// Register plugin-specific params on the base. Called once during construction.
261    fn register_params(
262        &mut self,
263        _base: &mut PortDriverBase,
264    ) -> Result<(), asyn_rs::error::AsynError> {
265        Ok(())
266    }
267
268    /// Called when a param changes. Reason is the param index.
269    /// Return param updates to be written back to the port driver.
270    fn on_param_change(
271        &mut self,
272        _reason: usize,
273        _params: &PluginParamSnapshot,
274    ) -> ParamChangeResult {
275        ParamChangeResult::empty()
276    }
277
278    /// Return a handle to the latest NDArray data for array reads.
279    /// Override this in plugins like NDPluginStdArrays that serve pixel data
280    /// via readInt8Array/readInt16Array/etc.
281    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
282        None
283    }
284}
285
286/// Read-only snapshot of param values available to the processing thread.
287pub struct PluginParamSnapshot {
288    pub enable_callbacks: bool,
289    /// The param reason that changed.
290    pub reason: usize,
291    /// The address (sub-device) that changed.
292    pub addr: i32,
293    /// The new value.
294    pub value: ParamChangeValue,
295}
296
297/// One buffered entry in the sort buffer: the output arrays for a uniqueId
298/// plus the instant they were inserted (for the per-element staleness
299/// deadline — C++ `sortedListElement::insertionTime_`).
300struct SortEntry {
301    arrays: Vec<Arc<NDArray>>,
302    inserted: std::time::Instant,
303}
304
305/// Sort buffer for reordering out-of-order output arrays by uniqueId.
306///
307/// Port of C++ `sortedNDArrayList_` semantics (NDPluginDriver.cpp).
308/// Only arrays that arrive *out of order* are buffered here — in-order
309/// arrays are emitted immediately by the caller (B2). The drain logic
310/// (`drain_ready`) releases the head while the next-expected uniqueId is
311/// contiguous OR the head has been buffered longer than `sort_time` (B3).
312struct SortBuffer {
313    /// Buffered out-of-order arrays keyed by uniqueId.
314    entries: BTreeMap<i32, SortEntry>,
315    /// uniqueId of the last array emitted downstream (C++ `prevUniqueId_`).
316    prev_unique_id: i32,
317    /// Whether any array has been emitted yet (C++ `firstOutputArray_`).
318    first_output: bool,
319    /// Cumulative count of arrays emitted out of order (C++ DisorderedArrays).
320    disordered_arrays: i32,
321    /// Cumulative count of arrays dropped because the buffer was full
322    /// (C++ DroppedOutputArrays — sort-buffer-overflow portion).
323    dropped_output_arrays: i32,
324}
325
326impl SortBuffer {
327    fn new() -> Self {
328        Self {
329            entries: BTreeMap::new(),
330            prev_unique_id: 0,
331            first_output: true,
332            disordered_arrays: 0,
333            dropped_output_arrays: 0,
334        }
335    }
336
337    /// True if `unique_id` follows `prev_unique_id` in order (C++ `orderOK`).
338    fn order_ok(&self, unique_id: i32) -> bool {
339        unique_id == self.prev_unique_id || unique_id == self.prev_unique_id + 1
340    }
341
342    /// Record that an array with `unique_id` was emitted downstream.
343    /// Updates `prev_unique_id` and counts a disorder if it was out of order.
344    fn note_emitted(&mut self, unique_id: i32) {
345        if !self.first_output && !self.order_ok(unique_id) {
346            self.disordered_arrays += 1;
347        }
348        self.first_output = false;
349        self.prev_unique_id = unique_id;
350    }
351
352    /// Insert an out-of-order array into the sort buffer.
353    ///
354    /// Returns `false` if the buffer was full and the array was dropped
355    /// (C++ NDPluginDriver.cpp:307-316), `true` if buffered.
356    fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) -> bool {
357        if sort_size > 0 && self.entries.len() as i32 >= sort_size {
358            self.dropped_output_arrays += 1;
359            return false;
360        }
361        self.entries
362            .entry(unique_id)
363            .or_insert_with(|| SortEntry {
364                arrays: Vec::new(),
365                inserted: std::time::Instant::now(),
366            })
367            .arrays
368            .extend(arrays);
369        true
370    }
371
372    /// Drain the buffer head-first while either the next expected uniqueId is
373    /// contiguous OR the head element has aged past `sort_time` seconds.
374    /// Port of C++ `sortingTask` loop (NDPluginDriver.cpp:619-670).
375    fn drain_ready(&mut self, sort_time: f64) -> Vec<(i32, Vec<Arc<NDArray>>)> {
376        let now = std::time::Instant::now();
377        let mut out = Vec::new();
378        while let Some((&head_id, entry)) = self.entries.iter().next() {
379            let delta = now.duration_since(entry.inserted).as_secs_f64();
380            let order_ok = self.order_ok(head_id);
381            if (!self.first_output && order_ok) || delta > sort_time {
382                let entry = self.entries.remove(&head_id).unwrap();
383                self.note_emitted(head_id);
384                out.push((head_id, entry.arrays));
385            } else {
386                break;
387            }
388        }
389        out
390    }
391
392    /// Drain every buffered array in uniqueId order, regardless of contiguity
393    /// or age. Used when sort mode is turned off.
394    fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
395        let entries = std::mem::take(&mut self.entries);
396        let mut out = Vec::with_capacity(entries.len());
397        for (id, entry) in entries {
398            self.note_emitted(id);
399            out.push((id, entry.arrays));
400        }
401        out
402    }
403
404    /// Number of uniqueId entries currently buffered.
405    fn len(&self) -> i32 {
406        self.entries.len() as i32
407    }
408}
409
410/// Shared processor state protected by a mutex, accessible from both
411/// the data thread (non-blocking mode) and the caller thread (blocking mode).
412struct SharedProcessorInner<P: NDPluginProcess> {
413    processor: P,
414    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
415    pool: Arc<NDArrayPool>,
416    ndarray_params: NDArrayDriverParams,
417    plugin_params: PluginBaseParams,
418    port_handle: PortHandle,
419    /// ArrayCounter — owned in the param library (C++ `NDArrayCounter`), held
420    /// here only as a working copy that is kept in sync with the param so a
421    /// control-plane write of `ARRAY_COUNTER` resets it (B12).
422    array_counter: i32,
423    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
424    std_array_data_param: Option<usize>,
425    /// MinCallbackTime throttling: minimum seconds between process calls.
426    min_callback_time: f64,
427    /// Last time process_and_publish was called (for throttling).
428    last_process_time: Option<std::time::Instant>,
429    /// Sort mode: 0 = disabled, 1 = sorted output.
430    sort_mode: i32,
431    /// Sort time: seconds — per-element staleness deadline for the sort buffer.
432    sort_time: f64,
433    /// Sort size: maximum number of uniqueId entries in the sort buffer.
434    sort_size: i32,
435    /// Sort buffer for reordering output arrays by uniqueId.
436    sort_buffer: SortBuffer,
437    /// Cumulative count of dropped *input* arrays (full queue / compression
438    /// gate / MinCallbackTime throttle). Shared with every upstream sender so
439    /// full-queue drops are visible here (G1, B1, B5).
440    dropped_arrays: Arc<std::sync::atomic::AtomicI32>,
441    /// Whether this plugin can process compressed (`codec != None`) arrays.
442    /// A non-compression-aware plugin drops compressed input (G3).
443    compression_aware: bool,
444    /// Output byte-rate limit (C++ `MaxByteRate`); 0 disables throttling.
445    max_byte_rate: f64,
446    /// Token-bucket throttler enforcing `max_byte_rate` on the output path (G7).
447    throttler: super::throttler::Throttler,
448    /// Last *input* array, cached for ProcessPlugin re-injection
449    /// (C++ `pPrevInputArray_`, G5). Released on `EnableCallbacks=0` (B6).
450    prev_input_array: Option<Arc<NDArray>>,
451    /// Previous array dimensions, for firing an NDDimensions int32-array
452    /// callback when dimensions change (C++ `dimsPrev_`, G8).
453    dims_prev: Vec<i32>,
454    /// Source address selected via the NDArrayAddr PV (C++ `NDArrayAddr`, G6).
455    nd_array_addr: i32,
456    /// MaxThreads — the clamp ceiling for NumThreads (C++ `MaxThreads`).
457    max_threads: i32,
458    /// NumThreads — validated/clamped to [1, MaxThreads] on write (G4).
459    num_threads: i32,
460}
461
462impl<P: NDPluginProcess> SharedProcessorInner<P> {
463    fn should_throttle(&self) -> bool {
464        if self.min_callback_time <= 0.0 {
465            return false;
466        }
467        if let Some(last) = self.last_process_time {
468            last.elapsed().as_secs_f64() < self.min_callback_time
469        } else {
470            false
471        }
472    }
473
474    /// Byte cost of an array for throttling (C++ `NDPluginDriver::throttled`):
475    /// compressed size when a codec is present, else total raw bytes.
476    fn array_byte_cost(array: &NDArray) -> f64 {
477        match &array.codec {
478            Some(c) => c.compressed_size as f64,
479            None => array.info().total_bytes as f64,
480        }
481    }
482
483    /// Apply the output throttle to one array. Returns `true` if the array
484    /// should be emitted, `false` if it was dropped (and counts the drop).
485    fn throttle_ok(&mut self, array: &NDArray) -> bool {
486        if self.max_byte_rate == 0.0 {
487            return true;
488        }
489        let cost = Self::array_byte_cost(array);
490        if self.throttler.try_take(cost) {
491            true
492        } else {
493            self.sort_buffer.dropped_output_arrays += 1;
494            false
495        }
496    }
497
498    /// Route output arrays through the throttle, the in-order fast path, and
499    /// the sort buffer. Returns arrays ready to emit *now*, in order.
500    ///
501    /// Port of C++ `endProcessCallbacks` (NDPluginDriver.cpp:295-328): an
502    /// array whose uniqueId is contiguous with `prevUniqueId_` is emitted
503    /// immediately (B2); only out-of-order arrays enter the sort buffer.
504    /// Disordered arrays are counted at emission time in both modes (B4).
505    fn route_output_arrays(&mut self, arrays: Vec<Arc<NDArray>>) -> Vec<Arc<NDArray>> {
506        let mut ready = Vec::new();
507        for arr in arrays {
508            if !self.throttle_ok(&arr) {
509                continue; // G7: dropped by MaxByteRate throttle
510            }
511            let uid = arr.unique_id;
512            if self.sort_mode != 0
513                && !self.sort_buffer.first_output
514                && !self.sort_buffer.order_ok(uid)
515            {
516                // Out of order with sort mode on: buffer it (B2/B3).
517                self.sort_buffer.insert(uid, vec![arr], self.sort_size);
518            } else {
519                // In order (or sort mode off): emit immediately, count disorder.
520                self.sort_buffer.note_emitted(uid);
521                ready.push(arr);
522            }
523        }
524        // After emitting in-order arrays, the sort buffer head may now be
525        // contiguous — release any newly-ready run (C++ sortingTask).
526        if self.sort_mode != 0 {
527            for (_id, mut bucket) in self.sort_buffer.drain_ready(self.sort_time) {
528                ready.append(&mut bucket);
529            }
530        }
531        ready
532    }
533
534    /// Process array and return a `ProcessOutput`. Does NOT send to actor.
535    /// Direct interrupts (std_array_data_param) happen here (sync).
536    /// The returned output must be published and flushed by the caller in async context.
537    fn process_and_publish(&mut self, array: &Arc<NDArray>) -> Option<ProcessOutput> {
538        // B5: a MinCallbackTime-throttled array is dropped — count it.
539        if self.should_throttle() {
540            self.dropped_arrays
541                .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
542            return Some(self.dropped_arrays_only_batch());
543        }
544        // R2/G5: cache the input array for ProcessPlugin re-injection only
545        // for arrays that actually pass the MinCallbackTime gate and are
546        // processed. C++ sets pPrevInputArray_ in beginProcessCallbacks,
547        // which runs inside processCallbacks — never for throttled frames.
548        self.prev_input_array = Some(Arc::clone(array));
549        let t0 = std::time::Instant::now();
550        let result = self.processor.process_array(array, &self.pool);
551        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
552        self.last_process_time = Some(t0);
553
554        let ready = self.route_output_arrays(result.output_arrays);
555        let mut output = self.build_publish_batch(
556            ready,
557            result.param_updates,
558            result.scatter_index,
559            Some(array.as_ref()),
560            elapsed_ms,
561        );
562        output.batch.merge(self.build_status_params_batch());
563        Some(output)
564    }
565
566    /// A param batch carrying only the current DroppedArrays / queue counters,
567    /// used when an array is dropped before processing (B5).
568    fn dropped_arrays_only_batch(&self) -> ProcessOutput {
569        ProcessOutput {
570            arrays: vec![],
571            scatter_index: None,
572            batch: self.build_status_params_batch(),
573        }
574    }
575
576    /// Re-inject the cached previous input array through the normal process
577    /// path (C++ ProcessPlugin, NDPluginDriver.cpp:739-746, G5).
578    fn process_plugin(&mut self) -> Option<ProcessOutput> {
579        let prev = self.prev_input_array.clone()?;
580        self.process_and_publish(&prev)
581    }
582
583    /// Flush the sort buffer head-first while contiguous or stale (C++
584    /// sortingTask periodic tick). Does NOT drain non-contiguous fresh arrays.
585    fn tick_sort_buffer(&mut self) -> ProcessOutput {
586        let entries = self.sort_buffer.drain_ready(self.sort_time);
587        self.emit_drained(entries)
588    }
589
590    /// Drain the entire sort buffer in uniqueId order (sort mode turned off).
591    fn flush_sort_buffer(&mut self) -> ProcessOutput {
592        let entries = self.sort_buffer.drain_all();
593        self.emit_drained(entries)
594    }
595
596    fn emit_drained(&mut self, entries: Vec<(i32, Vec<Arc<NDArray>>)>) -> ProcessOutput {
597        let mut all_arrays = Vec::new();
598        let mut combined = ParamBatch::empty();
599        for (_unique_id, arrays) in entries {
600            let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
601            all_arrays.extend(output.arrays);
602            combined.merge(output.batch);
603        }
604        combined.merge(self.build_sort_params_batch());
605        ProcessOutput {
606            arrays: all_arrays,
607            scatter_index: None,
608            batch: combined,
609        }
610    }
611
612    fn build_sort_params_batch(&self) -> ParamBatch {
613        use asyn_rs::request::ParamSetValue;
614        let sort_free = self.sort_size - self.sort_buffer.len();
615        ParamBatch {
616            addr0: vec![
617                ParamSetValue::Int32 {
618                    reason: self.plugin_params.sort_free,
619                    addr: 0,
620                    value: sort_free,
621                },
622                ParamSetValue::Int32 {
623                    reason: self.plugin_params.disordered_arrays,
624                    addr: 0,
625                    value: self.sort_buffer.disordered_arrays,
626                },
627                ParamSetValue::Int32 {
628                    reason: self.plugin_params.dropped_output_arrays,
629                    addr: 0,
630                    value: self.sort_buffer.dropped_output_arrays,
631                },
632            ],
633            extra: std::collections::HashMap::new(),
634        }
635    }
636
637    /// Build a param batch carrying the runtime status counters:
638    /// DroppedArrays (G1) plus the sort/disorder counters.
639    fn build_status_params_batch(&self) -> ParamBatch {
640        use asyn_rs::request::ParamSetValue;
641        let mut batch = self.build_sort_params_batch();
642        batch.addr0.push(ParamSetValue::Int32 {
643            reason: self.plugin_params.dropped_arrays,
644            addr: 0,
645            value: self
646                .dropped_arrays
647                .load(std::sync::atomic::Ordering::Acquire),
648        });
649        batch
650    }
651
652    /// Build a ProcessOutput: fires direct interrupts (sync) and collects
653    /// param updates into a batch. Does NOT publish arrays — the caller
654    /// must publish them in async context.
655    fn build_publish_batch(
656        &mut self,
657        output_arrays: Vec<Arc<NDArray>>,
658        param_updates: Vec<ParamUpdate>,
659        scatter_index: Option<usize>,
660        fallback_array: Option<&NDArray>,
661        elapsed_ms: f64,
662    ) -> ProcessOutput {
663        use asyn_rs::request::ParamSetValue;
664
665        let mut addr0: Vec<ParamSetValue> = Vec::new();
666        let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
667            std::collections::HashMap::new();
668
669        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
670            self.array_counter += 1;
671
672            // Fire array data interrupt directly (C EPICS pattern).
673            if let Some(param) = self.std_array_data_param {
674                use crate::ndarray::NDDataBuffer;
675                use asyn_rs::param::ParamValue;
676                let value = match &report_arr.data {
677                    NDDataBuffer::I8(v) => {
678                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
679                    }
680                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
681                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
682                    ))),
683                    NDDataBuffer::I16(v) => {
684                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
685                    }
686                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
687                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
688                    ))),
689                    NDDataBuffer::I32(v) => {
690                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
691                    }
692                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
693                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
694                    ))),
695                    NDDataBuffer::I64(v) => {
696                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
697                    }
698                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
699                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
700                    ))),
701                    NDDataBuffer::F32(v) => {
702                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
703                    }
704                    NDDataBuffer::F64(v) => {
705                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
706                    }
707                };
708                if let Some(value) = value {
709                    let ts = report_arr.timestamp.to_system_time();
710                    self.port_handle
711                        .interrupts()
712                        .notify(asyn_rs::interrupt::InterruptValue {
713                            reason: param,
714                            addr: 0,
715                            value,
716                            timestamp: ts,
717                            uint32_changed_mask: 0,
718                            ..Default::default()
719                        });
720                }
721            }
722
723            let info = report_arr.info();
724            // B11: read ColorMode / BayerPattern from the NDArray attributes
725            // (C++ beginProcessCallbacks). `info()` already resolves the
726            // ColorMode attribute when present; fall back to it for the param.
727            let color_mode = report_arr
728                .attributes
729                .get("ColorMode")
730                .and_then(|a| a.value.as_i64())
731                .map(|v| v as i32)
732                .unwrap_or(info.color_mode as i32);
733            let bayer_pattern = report_arr
734                .attributes
735                .get("BayerPattern")
736                .and_then(|a| a.value.as_i64())
737                .map(|v| v as i32)
738                .unwrap_or(0);
739
740            // G8: fire an int32-array callback on NDDimensions when the array
741            // dimensions change (C++ beginProcessCallbacks dimsPrev_).
742            let cur_dims: Vec<i32> = report_arr.dims.iter().map(|d| d.size as i32).collect();
743            if cur_dims != self.dims_prev {
744                self.dims_prev = cur_dims.clone();
745                self.port_handle
746                    .interrupts()
747                    .notify(asyn_rs::interrupt::InterruptValue {
748                        reason: self.ndarray_params.array_dimensions,
749                        addr: 0,
750                        value: asyn_rs::param::ParamValue::Int32Array(std::sync::Arc::from(
751                            cur_dims.as_slice(),
752                        )),
753                        timestamp: report_arr.timestamp.to_system_time(),
754                        uint32_changed_mask: 0,
755                        ..Default::default()
756                    });
757            }
758
759            addr0.extend([
760                ParamSetValue::Int32 {
761                    reason: self.ndarray_params.array_counter,
762                    addr: 0,
763                    value: self.array_counter,
764                },
765                ParamSetValue::Int32 {
766                    reason: self.ndarray_params.unique_id,
767                    addr: 0,
768                    value: report_arr.unique_id,
769                },
770                ParamSetValue::Int32 {
771                    reason: self.ndarray_params.n_dimensions,
772                    addr: 0,
773                    value: report_arr.dims.len() as i32,
774                },
775                ParamSetValue::Int32 {
776                    reason: self.ndarray_params.array_size_x,
777                    addr: 0,
778                    value: info.x_size as i32,
779                },
780                ParamSetValue::Int32 {
781                    reason: self.ndarray_params.array_size_y,
782                    addr: 0,
783                    value: info.y_size as i32,
784                },
785                ParamSetValue::Int32 {
786                    reason: self.ndarray_params.array_size_z,
787                    addr: 0,
788                    value: info.color_size as i32,
789                },
790                ParamSetValue::Int32 {
791                    reason: self.ndarray_params.array_size,
792                    addr: 0,
793                    value: info.total_bytes as i32,
794                },
795                ParamSetValue::Int32 {
796                    reason: self.ndarray_params.data_type,
797                    addr: 0,
798                    value: report_arr.data.data_type() as i32,
799                },
800                ParamSetValue::Int32 {
801                    reason: self.ndarray_params.color_mode,
802                    addr: 0,
803                    value: color_mode,
804                },
805                ParamSetValue::Int32 {
806                    reason: self.ndarray_params.bayer_pattern,
807                    addr: 0,
808                    value: bayer_pattern,
809                },
810                ParamSetValue::Float64 {
811                    reason: self.ndarray_params.timestamp_rbv,
812                    addr: 0,
813                    value: report_arr.timestamp.as_f64(),
814                },
815                ParamSetValue::Int32 {
816                    reason: self.ndarray_params.epics_ts_sec,
817                    addr: 0,
818                    value: report_arr.timestamp.sec as i32,
819                },
820                ParamSetValue::Int32 {
821                    reason: self.ndarray_params.epics_ts_nsec,
822                    addr: 0,
823                    value: report_arr.timestamp.nsec as i32,
824                },
825            ]);
826        }
827
828        addr0.push(ParamSetValue::Float64 {
829            reason: self.plugin_params.execution_time,
830            addr: 0,
831            value: elapsed_ms,
832        });
833
834        // ArrayRate_RBV is computed by a calc record in the DB template
835        // (SCAN "1 second", reading ArrayCounter_RBV delta), not in Rust.
836
837        // Plugin-specific param updates.
838        for update in &param_updates {
839            match update {
840                ParamUpdate::Int32 {
841                    reason,
842                    addr,
843                    value,
844                } => {
845                    let pv = ParamSetValue::Int32 {
846                        reason: *reason,
847                        addr: *addr,
848                        value: *value,
849                    };
850                    if *addr == 0 {
851                        addr0.push(pv);
852                    } else {
853                        extra.entry(*addr).or_default().push(pv);
854                    }
855                }
856                ParamUpdate::Float64 {
857                    reason,
858                    addr,
859                    value,
860                } => {
861                    let pv = ParamSetValue::Float64 {
862                        reason: *reason,
863                        addr: *addr,
864                        value: *value,
865                    };
866                    if *addr == 0 {
867                        addr0.push(pv);
868                    } else {
869                        extra.entry(*addr).or_default().push(pv);
870                    }
871                }
872                ParamUpdate::Octet {
873                    reason,
874                    addr,
875                    value,
876                } => {
877                    let pv = ParamSetValue::Octet {
878                        reason: *reason,
879                        addr: *addr,
880                        value: value.clone(),
881                    };
882                    if *addr == 0 {
883                        addr0.push(pv);
884                    } else {
885                        extra.entry(*addr).or_default().push(pv);
886                    }
887                }
888                ParamUpdate::Float64Array {
889                    reason,
890                    addr,
891                    value,
892                } => {
893                    let pv = ParamSetValue::Float64Array {
894                        reason: *reason,
895                        addr: *addr,
896                        value: value.clone(),
897                    };
898                    if *addr == 0 {
899                        addr0.push(pv);
900                    } else {
901                        extra.entry(*addr).or_default().push(pv);
902                    }
903                }
904            }
905        }
906
907        ProcessOutput {
908            arrays: output_arrays,
909            scatter_index,
910            batch: ParamBatch { addr0, extra },
911        }
912    }
913}
914
915/// Output from processing: arrays to publish + param batch to flush.
916struct ProcessOutput {
917    arrays: Vec<Arc<NDArray>>,
918    scatter_index: Option<usize>,
919    batch: ParamBatch,
920}
921
922impl ProcessOutput {
923    /// Publish arrays to downstream senders (async, concurrent fan-out).
924    ///
925    /// Each array is published to all senders concurrently (independent
926    /// backpressure per sender). Arrays are published in order — the next
927    /// array's fan-out starts only after the previous one completes.
928    async fn publish_arrays(&self, senders: &[NDArraySender]) {
929        for arr in &self.arrays {
930            if let Some(idx) = self.scatter_index {
931                if let Some(sender) = senders.get(idx % senders.len().max(1)) {
932                    sender.publish(arr.clone()).await;
933                }
934            } else {
935                let futs = senders.iter().map(|s| s.publish(arr.clone()));
936                futures_util::future::join_all(futs).await;
937            }
938        }
939    }
940}
941
942/// Collected param updates ready to be flushed to the actor.
943/// Produced by `build_publish_batch()`, consumed by async `flush()`.
944struct ParamBatch {
945    addr0: Vec<asyn_rs::request::ParamSetValue>,
946    extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
947}
948
949impl ParamBatch {
950    fn empty() -> Self {
951        Self {
952            addr0: Vec::new(),
953            extra: std::collections::HashMap::new(),
954        }
955    }
956
957    fn merge(&mut self, other: ParamBatch) {
958        self.addr0.extend(other.addr0);
959        for (addr, updates) in other.extra {
960            self.extra.entry(addr).or_default().extend(updates);
961        }
962    }
963
964    /// Flush via reliable async enqueue. Call from async context.
965    async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
966        if !self.addr0.is_empty() {
967            if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
968                eprintln!("plugin param flush error (addr 0): {e}");
969            }
970        }
971        for (addr, updates) in self.extra {
972            if let Err(e) = port.set_params_and_notify(addr, updates).await {
973                eprintln!("plugin param flush error (addr {addr}): {e}");
974            }
975        }
976    }
977}
978
979/// PortDriver implementation for a plugin's control plane.
980#[allow(dead_code)]
981pub struct PluginPortDriver {
982    base: PortDriverBase,
983    ndarray_params: NDArrayDriverParams,
984    plugin_params: PluginBaseParams,
985    param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
986    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
987    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
988    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
989    std_array_data_param: Option<usize>,
990}
991
992impl PluginPortDriver {
993    fn new<P: NDPluginProcess>(
994        port_name: &str,
995        plugin_type_name: &str,
996        queue_size: usize,
997        ndarray_port: &str,
998        max_addr: usize,
999        param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
1000        processor: &mut P,
1001        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
1002    ) -> AsynResult<Self> {
1003        let mut base = PortDriverBase::new(
1004            port_name,
1005            max_addr,
1006            PortFlags {
1007                can_block: true,
1008                ..Default::default()
1009            },
1010        );
1011
1012        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
1013        let plugin_params = PluginBaseParams::create(&mut base)?;
1014
1015        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
1016        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
1017        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
1018        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
1019        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
1020        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
1021        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
1022        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
1023        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
1024        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
1025        base.set_int32_param(ndarray_params.capture, 0, 0)?;
1026        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
1027        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
1028        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
1029        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
1030        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
1031        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
1032        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
1033        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
1034        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
1035        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
1036
1037        // Set plugin identity params
1038        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
1039        base.set_string_param(
1040            ndarray_params.ad_core_version,
1041            0,
1042            env!("CARGO_PKG_VERSION").into(),
1043        )?;
1044        base.set_string_param(
1045            ndarray_params.driver_version,
1046            0,
1047            env!("CARGO_PKG_VERSION").into(),
1048        )?;
1049        if !ndarray_port.is_empty() {
1050            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
1051        }
1052
1053        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
1054        let std_array_data_param = if array_data.is_some() {
1055            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
1056        } else {
1057            None
1058        };
1059
1060        // Let the processor register its plugin-specific params
1061        processor.register_params(&mut base)?;
1062
1063        Ok(Self {
1064            base,
1065            ndarray_params,
1066            plugin_params,
1067            param_change_tx,
1068            array_data,
1069            std_array_data_param,
1070        })
1071    }
1072}
1073
1074/// Copy source slice directly into destination buffer, returning elements copied.
1075fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
1076    let n = src.len().min(dst.len());
1077    dst[..n].copy_from_slice(&src[..n]);
1078    n
1079}
1080
1081/// Convert and copy source slice into destination buffer element-by-element.
1082fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
1083where
1084    S: CastToF64 + Copy,
1085    D: CastFromF64 + Copy,
1086{
1087    let n = src.len().min(dst.len());
1088    for i in 0..n {
1089        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
1090    }
1091    n
1092}
1093
1094/// Cast an integer source element to an integer destination element with C
1095/// cast semantics. C++ `NDArrayPool::convert` (`NDArrayPool.cpp:388`,
1096/// `convertType`: `*pDataOut++ = (dataTypeOut)(*pDataIn++)`; and `:466`,
1097/// `convertDim`) performs a plain C cast between integer types. A C cast:
1098///   - same-width sign change is a bitwise reinterpret
1099///     (`(epicsInt8)(epicsUInt8)255 == -1`);
1100///   - narrowing truncates to the low bits, wrapping
1101///     (`(epicsInt8)(epicsUInt16)300 == 44`);
1102///   - widening sign/zero-extends exactly.
1103///
1104/// Rust's `as` between integer types implements exactly these semantics. The
1105/// f64 round-trip in [`copy_convert`] does NOT: it saturates on narrowing
1106/// (`300.0 as i8 == 127`), diverging from C++. So every integer-source ->
1107/// integer-target NDArray array read must go through this C-cast path, not
1108/// `copy_convert`.
1109trait CCastTo<D> {
1110    fn ccast(self) -> D;
1111}
1112macro_rules! impl_ccast {
1113    ( $src:ty => $( $dst:ty ),+ ) => {
1114        $(
1115            impl CCastTo<$dst> for $src {
1116                #[inline]
1117                fn ccast(self) -> $dst {
1118                    self as $dst
1119                }
1120            }
1121        )+
1122    };
1123}
1124impl_ccast!(i8 => i16, i32, i64);
1125impl_ccast!(u8 => i8, i16, i32, i64);
1126impl_ccast!(i16 => i8, i32, i64);
1127impl_ccast!(u16 => i8, i16, i32, i64);
1128impl_ccast!(i32 => i8, i16, i64);
1129impl_ccast!(u32 => i8, i16, i32, i64);
1130impl_ccast!(i64 => i8, i16, i32);
1131impl_ccast!(u64 => i8, i16, i32, i64);
1132
1133/// Copy an integer source slice into an integer destination buffer using C
1134/// cast semantics (see [`CCastTo`]) — truncating on narrowing, never
1135/// saturating.
1136fn copy_ccast<S, D>(src: &[S], dst: &mut [D]) -> usize
1137where
1138    S: CCastTo<D> + Copy,
1139    D: Copy,
1140{
1141    let n = src.len().min(dst.len());
1142    for i in 0..n {
1143        dst[i] = src[i].ccast();
1144    }
1145    n
1146}
1147
1148/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
1149trait CastToF64 {
1150    fn cast_to_f64(self) -> f64;
1151}
1152
1153impl CastToF64 for i8 {
1154    fn cast_to_f64(self) -> f64 {
1155        self as f64
1156    }
1157}
1158impl CastToF64 for u8 {
1159    fn cast_to_f64(self) -> f64 {
1160        self as f64
1161    }
1162}
1163impl CastToF64 for i16 {
1164    fn cast_to_f64(self) -> f64 {
1165        self as f64
1166    }
1167}
1168impl CastToF64 for u16 {
1169    fn cast_to_f64(self) -> f64 {
1170        self as f64
1171    }
1172}
1173impl CastToF64 for i32 {
1174    fn cast_to_f64(self) -> f64 {
1175        self as f64
1176    }
1177}
1178impl CastToF64 for u32 {
1179    fn cast_to_f64(self) -> f64 {
1180        self as f64
1181    }
1182}
1183impl CastToF64 for i64 {
1184    fn cast_to_f64(self) -> f64 {
1185        self as f64
1186    }
1187}
1188impl CastToF64 for u64 {
1189    fn cast_to_f64(self) -> f64 {
1190        self as f64
1191    }
1192}
1193impl CastToF64 for f32 {
1194    fn cast_to_f64(self) -> f64 {
1195        self as f64
1196    }
1197}
1198impl CastToF64 for f64 {
1199    fn cast_to_f64(self) -> f64 {
1200        self
1201    }
1202}
1203
1204/// Helper trait for `as` casts from f64.
1205trait CastFromF64 {
1206    fn cast_from_f64(v: f64) -> Self;
1207}
1208
1209impl CastFromF64 for i8 {
1210    fn cast_from_f64(v: f64) -> Self {
1211        v as i8
1212    }
1213}
1214impl CastFromF64 for i16 {
1215    fn cast_from_f64(v: f64) -> Self {
1216        v as i16
1217    }
1218}
1219impl CastFromF64 for i32 {
1220    fn cast_from_f64(v: f64) -> Self {
1221        v as i32
1222    }
1223}
1224impl CastFromF64 for i64 {
1225    fn cast_from_f64(v: f64) -> Self {
1226        v as i64
1227    }
1228}
1229impl CastFromF64 for f32 {
1230    fn cast_from_f64(v: f64) -> Self {
1231        v as f32
1232    }
1233}
1234impl CastFromF64 for f64 {
1235    fn cast_from_f64(v: f64) -> Self {
1236        v
1237    }
1238}
1239
1240/// Copy NDArray data into the output buffer with type conversion.
1241/// Returns the number of elements copied, or 0 if no data is available.
1242macro_rules! impl_read_array {
1243    (
1244        $self:expr, $buf:expr, $direct_variant:ident,
1245        ccast: [ $( $ccast_variant:ident ),* ],
1246        convert: [ $( $variant:ident ),* ]
1247    ) => {{
1248        use crate::ndarray::NDDataBuffer;
1249        let handle = match &$self.array_data {
1250            Some(h) => h,
1251            None => return Ok(0),
1252        };
1253        let guard = handle.lock();
1254        let array = match &*guard {
1255            Some(a) => a,
1256            None => return Ok(0),
1257        };
1258        let n = match &array.data {
1259            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
1260            $( NDDataBuffer::$ccast_variant(v) => copy_ccast(v, $buf), )*
1261            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
1262        };
1263        Ok(n)
1264    }};
1265}
1266
1267impl PortDriver for PluginPortDriver {
1268    fn base(&self) -> &PortDriverBase {
1269        &self.base
1270    }
1271
1272    fn base_mut(&mut self) -> &mut PortDriverBase {
1273        &mut self.base
1274    }
1275
1276    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
1277        let reason = user.reason;
1278        let addr = user.addr;
1279        self.base.set_int32_param(reason, addr, value)?;
1280        self.base.call_param_callbacks(addr)?;
1281        // B14: reliable send on an unbounded channel — never drop param changes.
1282        let _ = self
1283            .param_change_tx
1284            .send((reason, addr, ParamChangeValue::Int32(value)));
1285        Ok(())
1286    }
1287
1288    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
1289        let reason = user.reason;
1290        let addr = user.addr;
1291        self.base.set_float64_param(reason, addr, value)?;
1292        self.base.call_param_callbacks(addr)?;
1293        let _ = self
1294            .param_change_tx
1295            .send((reason, addr, ParamChangeValue::Float64(value)));
1296        Ok(())
1297    }
1298
1299    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
1300        let reason = user.reason;
1301        let addr = user.addr;
1302        let s = String::from_utf8_lossy(data).into_owned();
1303        self.base.set_string_param(reason, addr, s.clone())?;
1304        self.base.call_param_callbacks(addr)?;
1305        let _ = self
1306            .param_change_tx
1307            .send((reason, addr, ParamChangeValue::Octet(s)));
1308        Ok(())
1309    }
1310
1311    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
1312        // Every integer source -> i8 is a C cast (truncating, per C++
1313        // NDArrayPool.cpp:388); float sources keep the numeric f64 conversion.
1314        impl_read_array!(
1315            self, buf, I8,
1316            ccast: [U8, I16, U16, I32, U32, I64, U64],
1317            convert: [F32, F64]
1318        )
1319    }
1320
1321    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
1322        impl_read_array!(
1323            self, buf, I16,
1324            ccast: [I8, U8, U16, I32, U32, I64, U64],
1325            convert: [F32, F64]
1326        )
1327    }
1328
1329    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
1330        impl_read_array!(
1331            self, buf, I32,
1332            ccast: [I8, U8, I16, U16, U32, I64, U64],
1333            convert: [F32, F64]
1334        )
1335    }
1336
1337    fn read_int64_array(&mut self, _user: &AsynUser, buf: &mut [i64]) -> AsynResult<usize> {
1338        impl_read_array!(
1339            self, buf, I64,
1340            ccast: [I8, U8, I16, U16, I32, U32, U64],
1341            convert: [F32, F64]
1342        )
1343    }
1344
1345    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
1346        impl_read_array!(
1347            self, buf, F32,
1348            ccast: [],
1349            convert: [I8, U8, I16, U16, I32, U32, I64, U64, F64]
1350        )
1351    }
1352
1353    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
1354        impl_read_array!(
1355            self, buf, F64,
1356            ccast: [],
1357            convert: [I8, U8, I16, U16, I32, U32, I64, U64, F32]
1358        )
1359    }
1360}
1361
1362/// Handle to a running plugin runtime. Provides access to sender and port handle.
1363#[derive(Clone)]
1364pub struct PluginRuntimeHandle {
1365    port_runtime: PortRuntimeHandle,
1366    array_sender: NDArraySender,
1367    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
1368    port_name: String,
1369    pub ndarray_params: NDArrayDriverParams,
1370    pub plugin_params: PluginBaseParams,
1371}
1372
1373impl PluginRuntimeHandle {
1374    pub fn port_runtime(&self) -> &PortRuntimeHandle {
1375        &self.port_runtime
1376    }
1377
1378    pub fn array_sender(&self) -> &NDArraySender {
1379        &self.array_sender
1380    }
1381
1382    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
1383        &self.array_output
1384    }
1385
1386    pub fn port_name(&self) -> &str {
1387        &self.port_name
1388    }
1389}
1390
1391/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
1392///
1393/// Returns:
1394/// - `PluginRuntimeHandle` for wiring and control
1395/// - `PortRuntimeHandle` for param I/O
1396/// - `JoinHandle` for the data processing thread
1397pub fn create_plugin_runtime<P: NDPluginProcess>(
1398    port_name: &str,
1399    processor: P,
1400    pool: Arc<NDArrayPool>,
1401    queue_size: usize,
1402    ndarray_port: &str,
1403    wiring: Arc<WiringRegistry>,
1404) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1405    create_plugin_runtime_multi_addr(
1406        port_name,
1407        processor,
1408        pool,
1409        queue_size,
1410        ndarray_port,
1411        wiring,
1412        1,
1413    )
1414}
1415
1416/// Create a plugin runtime with multi-addr support.
1417///
1418/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
1419pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1420    port_name: &str,
1421    mut processor: P,
1422    pool: Arc<NDArrayPool>,
1423    queue_size: usize,
1424    ndarray_port: &str,
1425    wiring: Arc<WiringRegistry>,
1426    max_addr: usize,
1427) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1428    // Param change channel (control plane -> data plane)
1429    // B14: unbounded so control-plane param changes (e.g. autosave restoring
1430    // hundreds of PVs at IOC init) are never silently dropped before the
1431    // data plane sees them.
1432    let (param_tx, param_rx) =
1433        tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
1434
1435    // Capture plugin type and array data handle before mutable borrow
1436    let plugin_type_name = processor.plugin_type().to_string();
1437    let compression_aware = processor.compression_aware();
1438    let array_data = processor.array_data_handle();
1439
1440    // Create the port driver for control plane
1441    let driver = PluginPortDriver::new(
1442        port_name,
1443        &plugin_type_name,
1444        queue_size,
1445        ndarray_port,
1446        max_addr,
1447        param_tx,
1448        &mut processor,
1449        array_data,
1450    )
1451    .expect("failed to create plugin port driver");
1452
1453    let ndarray_params = driver.ndarray_params;
1454    let plugin_params = driver.plugin_params;
1455    let std_array_data_param = driver.std_array_data_param;
1456
1457    // Create port runtime (actor thread for param I/O)
1458    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1459
1460    // Clone port handle for the data thread to write params back
1461    let port_handle = port_runtime.port_handle().clone();
1462
1463    // Array channel (data plane)
1464    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1465
1466    // Shared mode flags
1467    let enabled = Arc::new(AtomicBool::new(false));
1468    let blocking_mode = Arc::new(AtomicBool::new(false));
1469
1470    // Shared processor (accessible from data thread)
1471    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1472    let array_output_for_handle = array_output.clone();
1473    // B13/G6: register this plugin's output so the WiringRegistry is the
1474    // single source of truth for runtime rewiring (PluginManager::add_plugin
1475    // would also register it, but direct callers must not bypass the
1476    // registry). Registered under every address in 0..max_addr so a
1477    // downstream plugin can select a non-zero NDArrayAddr.
1478    wiring.register_output_addrs(port_name, max_addr, array_output.clone());
1479    // G1/B1: the DroppedArrays counter is owned by this plugin and shared with
1480    // every upstream sender so full-queue drops on our input queue are counted.
1481    let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
1482    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1483        processor,
1484        output: array_output,
1485        pool,
1486        ndarray_params,
1487        plugin_params,
1488        port_handle,
1489        array_counter: 0,
1490        std_array_data_param,
1491        min_callback_time: 0.0,
1492        last_process_time: None,
1493        sort_mode: 0,
1494        sort_time: 0.0,
1495        sort_size: 10,
1496        sort_buffer: SortBuffer::new(),
1497        dropped_arrays: dropped_arrays_counter,
1498        compression_aware,
1499        max_byte_rate: 0.0,
1500        throttler: super::throttler::Throttler::new(0.0),
1501        prev_input_array: None,
1502        dims_prev: Vec::new(),
1503        nd_array_addr: 0,
1504        max_threads: 1,
1505        num_threads: 1,
1506    }));
1507
1508    let data_enabled = enabled.clone();
1509    let data_blocking = blocking_mode.clone();
1510
1511    let mut array_sender = array_sender;
1512    array_sender.set_mode_flags(enabled, blocking_mode);
1513
1514    // Capture wiring info for data loop
1515    let sender_port_name = port_name.to_string();
1516    let initial_upstream = ndarray_port.to_string();
1517
1518    // Spawn data processing thread
1519    let data_jh = thread::Builder::new()
1520        .name(format!("plugin-data-{port_name}"))
1521        .spawn(move || {
1522            plugin_data_loop(
1523                shared,
1524                array_rx,
1525                param_rx,
1526                plugin_params,
1527                ndarray_params.array_counter,
1528                data_enabled,
1529                data_blocking,
1530                sender_port_name,
1531                initial_upstream,
1532                wiring,
1533            );
1534        })
1535        .expect("failed to spawn plugin data thread");
1536
1537    let handle = PluginRuntimeHandle {
1538        port_runtime,
1539        array_sender,
1540        array_output: array_output_for_handle,
1541        port_name: port_name.to_string(),
1542        ndarray_params,
1543        plugin_params,
1544    };
1545
1546    (handle, data_jh)
1547}
1548
1549/// Build a param batch reporting the input-queue depth.
1550///
1551/// `QUEUE_SIZE` = total capacity, `QUEUE_FREE` = free slots. G2: the param is
1552/// named `QUEUE_FREE` and the reconciled semantics are *free slots*, matching
1553/// C++ `NDPluginDriverQueueFree = queueSize - pending()`.
1554fn queue_status_batch(
1555    plugin_params: &PluginBaseParams,
1556    max_capacity: usize,
1557    free: i32,
1558) -> ParamBatch {
1559    use asyn_rs::request::ParamSetValue;
1560    ParamBatch {
1561        addr0: vec![
1562            ParamSetValue::Int32 {
1563                reason: plugin_params.queue_size,
1564                addr: 0,
1565                value: max_capacity as i32,
1566            },
1567            ParamSetValue::Int32 {
1568                reason: plugin_params.queue_use,
1569                addr: 0,
1570                value: free,
1571            },
1572        ],
1573        extra: std::collections::HashMap::new(),
1574    }
1575}
1576
1577/// Write a validated/clamped int32 value back into the param library so the
1578/// RBV reflects the accepted value (G4 NumThreads/MaxThreads clamping).
1579async fn clamp_writeback(port: &PortHandle, reason: usize, value: i32) {
1580    use asyn_rs::request::ParamSetValue;
1581    let _ = port
1582        .set_params_and_notify(
1583            0,
1584            vec![ParamSetValue::Int32 {
1585                reason,
1586                addr: 0,
1587                value,
1588            }],
1589        )
1590        .await;
1591}
1592
1593fn plugin_data_loop<P: NDPluginProcess>(
1594    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1595    mut array_rx: NDArrayReceiver,
1596    mut param_rx: tokio::sync::mpsc::UnboundedReceiver<(usize, i32, ParamChangeValue)>,
1597    plugin_params: PluginBaseParams,
1598    array_counter_reason: usize,
1599    enabled: Arc<AtomicBool>,
1600    blocking_mode: Arc<AtomicBool>,
1601    sender_port_name: String,
1602    initial_upstream: String,
1603    wiring: Arc<WiringRegistry>,
1604) {
1605    let enable_callbacks_reason = plugin_params.enable_callbacks;
1606    let blocking_callbacks_reason = plugin_params.blocking_callbacks;
1607    let min_callback_time_reason = plugin_params.min_callback_time;
1608    let sort_mode_reason = plugin_params.sort_mode;
1609    let sort_time_reason = plugin_params.sort_time;
1610    let sort_size_reason = plugin_params.sort_size;
1611    let nd_array_port_reason = plugin_params.nd_array_port;
1612    let nd_array_addr_reason = plugin_params.nd_array_addr;
1613    let process_plugin_reason = plugin_params.process_plugin;
1614    let max_byte_rate_reason = plugin_params.max_byte_rate;
1615    let num_threads_reason = plugin_params.num_threads;
1616    let max_threads_reason = plugin_params.max_threads;
1617    // G6: the upstream connection is keyed by (port, addr). `current_upstream`
1618    // is the base port name; `current_addr` is the selected NDArrayAddr; the
1619    // effective WiringRegistry key is computed by `upstream_key`.
1620    let mut current_upstream = initial_upstream;
1621    let mut current_addr: i32 = 0;
1622    let rt = tokio::runtime::Builder::new_current_thread()
1623        .enable_all()
1624        .build()
1625        .unwrap();
1626    rt.block_on(async {
1627        // Sort flush timer — starts disabled (very long interval).
1628        // Re-created when sort_time changes.
1629        let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1630        let mut sort_flush_active = false;
1631        // Last published QueueFree value — only flush the queue params when it
1632        // changes, so a steady queue depth does not spam param callbacks.
1633        let mut last_queue_free: Option<i32> = None;
1634
1635        loop {
1636            tokio::select! {
1637                msg = array_rx.recv_msg() => {
1638                    match msg {
1639                        Some(msg) => {
1640                            // B6: quiesce is synchronous — if callbacks were
1641                            // disabled (the param arm flips `enabled` before
1642                            // any further array message is handled), drop the
1643                            // array here without processing.
1644                            if !enabled.load(Ordering::Acquire) {
1645                                continue;
1646                            }
1647                            // Process array and collect output (sync, under lock).
1648                            let (process_output, senders, port) = {
1649                                let mut guard = shared.lock();
1650                                // G3: a non-compression-aware plugin must drop
1651                                // a compressed array (codec set) and count it
1652                                // (C++ driverCallback NDPluginDriver.cpp:383-394).
1653                                let compressed = msg.array.codec.is_some();
1654                                let output = if compressed && !guard.compression_aware {
1655                                    guard
1656                                        .dropped_arrays
1657                                        .fetch_add(1, Ordering::AcqRel);
1658                                    Some(guard.dropped_arrays_only_batch())
1659                                } else {
1660                                    // R2/G5: process_and_publish caches the
1661                                    // input array for ProcessPlugin only after
1662                                    // the MinCallbackTime gate passes — a
1663                                    // throttled frame is never cached.
1664                                    guard.process_and_publish(&msg.array)
1665                                };
1666                                let senders = guard.output.lock().senders_clone();
1667                                let port = guard.port_handle.clone();
1668                                (output, senders, port)
1669                            };
1670                            // G2: update QueueSize/QueueFree from the channel
1671                            // depth (C++ NDPluginDriver.cpp:512-513). QueueFree
1672                            // = max_capacity - pending. Only flush when the
1673                            // value changed to avoid no-op param callbacks.
1674                            let max_cap = array_rx.max_capacity();
1675                            let free = max_cap.saturating_sub(array_rx.pending()) as i32;
1676                            let queue_batch = if last_queue_free != Some(free) {
1677                                last_queue_free = Some(free);
1678                                Some(queue_status_batch(&plugin_params, max_cap, free))
1679                            } else {
1680                                None
1681                            };
1682                            // msg dropped here → completion signaled (if tracked)
1683                            // Publish arrays and flush params outside the lock, in async context.
1684                            if let Some(po) = process_output {
1685                                po.publish_arrays(&senders).await;
1686                                po.batch.flush(&port).await;
1687                            }
1688                            if let Some(qb) = queue_batch {
1689                                qb.flush(&port).await;
1690                            }
1691                        }
1692                        None => break,
1693                    }
1694                }
1695                param = param_rx.recv() => {
1696                    match param {
1697                        Some((reason, addr, value)) => {
1698                            if reason == enable_callbacks_reason {
1699                                let on = value.as_i32() != 0;
1700                                enabled.store(on, Ordering::Release);
1701                                // B6: disabling releases the cached input array
1702                                // (C++ writeInt32 NDPluginDriver.cpp:712-722).
1703                                if !on {
1704                                    shared.lock().prev_input_array = None;
1705                                }
1706                            }
1707                            if reason == blocking_callbacks_reason {
1708                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1709                            }
1710                            // Handle MinCallbackTime param change
1711                            if reason == min_callback_time_reason {
1712                                shared.lock().min_callback_time = value.as_f64();
1713                            }
1714                            // G7: MaxByteRate change resets the output throttler
1715                            // (C++ writeFloat64 NDPluginDriver.cpp:788-790).
1716                            if reason == max_byte_rate_reason {
1717                                let rate = value.as_f64();
1718                                let mut guard = shared.lock();
1719                                guard.max_byte_rate = rate;
1720                                guard.throttler.reset(rate);
1721                            }
1722                            // G4: NumThreads / MaxThreads are validated and
1723                            // clamped on write. The Rust port is intentionally
1724                            // single-threaded per plugin (one tokio task) — see
1725                            // the module note — so NumThreads is clamped to
1726                            // [1, MaxThreads] and the clamped value written back
1727                            // rather than spawning a worker pool.
1728                            if reason == max_threads_reason {
1729                                // Scope the guard so it is released before await.
1730                                let (port, clamped, mt) = {
1731                                    let mut guard = shared.lock();
1732                                    guard.max_threads = value.as_i32().max(1);
1733                                    let clamped =
1734                                        guard.num_threads.clamp(1, guard.max_threads);
1735                                    guard.num_threads = clamped;
1736                                    (guard.port_handle.clone(), clamped, guard.max_threads)
1737                                };
1738                                clamp_writeback(&port, num_threads_reason, clamped).await;
1739                                clamp_writeback(&port, max_threads_reason, mt).await;
1740                            }
1741                            if reason == num_threads_reason {
1742                                let (port, clamped) = {
1743                                    let mut guard = shared.lock();
1744                                    let clamped =
1745                                        value.as_i32().clamp(1, guard.max_threads.max(1));
1746                                    guard.num_threads = clamped;
1747                                    (guard.port_handle.clone(), clamped)
1748                                };
1749                                clamp_writeback(&port, num_threads_reason, clamped).await;
1750                            }
1751                            // G6: NDArrayAddr selects a source address of a
1752                            // multi-address driver — reconnect on change
1753                            // (C++ writeInt32 NDPluginDriver.cpp:724-728).
1754                            if reason == nd_array_addr_reason {
1755                                let new_addr = value.as_i32();
1756                                if new_addr != current_addr {
1757                                    let old_key = upstream_key(&current_upstream, current_addr);
1758                                    let new_key = upstream_key(&current_upstream, new_addr);
1759                                    shared.lock().nd_array_addr = new_addr;
1760                                    match wiring.rewire_by_name(
1761                                        &sender_port_name,
1762                                        &old_key,
1763                                        &new_key,
1764                                    ) {
1765                                        Ok(()) => current_addr = new_addr,
1766                                        Err(e) => {
1767                                            eprintln!("NDArrayAddr reconnect failed: {e}");
1768                                            shared.lock().nd_array_addr = current_addr;
1769                                        }
1770                                    }
1771                                }
1772                            }
1773                            // G5: ProcessPlugin re-injects the cached input
1774                            // array (C++ writeInt32 NDPluginDriver.cpp:739-746).
1775                            if reason == process_plugin_reason && value.as_i32() != 0 {
1776                                let (process_output, senders, port) = {
1777                                    let mut guard = shared.lock();
1778                                    let output = guard.process_plugin();
1779                                    let senders = guard.output.lock().senders_clone();
1780                                    let port = guard.port_handle.clone();
1781                                    (output, senders, port)
1782                                };
1783                                if let Some(po) = process_output {
1784                                    po.publish_arrays(&senders).await;
1785                                    po.batch.flush(&port).await;
1786                                } else {
1787                                    // C parity: NDPluginDriver::writeInt32
1788                                    // (NDPluginDriver.cpp:743) logs this at
1789                                    // ASYN_TRACE_WARNING, which is OFF in the
1790                                    // default port trace mask. Gate through the
1791                                    // port's trace facility so iocInit stays
1792                                    // silent unless WARNING is enabled, instead
1793                                    // of an unconditional eprintln! that spams
1794                                    // stderr on every PINI ProcessPlugin trigger.
1795                                    // The asyn port registry that exposes the
1796                                    // per-port mask only exists with the `ioc`
1797                                    // integration; a bare plugin build has no
1798                                    // trace facility to consult, so it stays
1799                                    // silent there too.
1800                                    #[cfg(feature = "ioc")]
1801                                    if let Some(entry) =
1802                                        asyn_rs::asyn_record::get_port(&sender_port_name)
1803                                    {
1804                                        asyn_rs::asyn_trace!(
1805                                            entry.trace,
1806                                            sender_port_name.as_str(),
1807                                            asyn_rs::trace::TraceMask::WARNING,
1808                                            "plugin {sender_port_name}: ProcessPlugin \
1809                                             requested but no input array cached"
1810                                        );
1811                                    }
1812                                }
1813                            }
1814                            // B12: a control-plane write of ArrayCounter resets
1815                            // the working counter (C++ keeps NDArrayCounter in
1816                            // the param library; beginProcessCallbacks reads it).
1817                            if reason == array_counter_reason {
1818                                shared.lock().array_counter = value.as_i32();
1819                            }
1820                            // Handle sort param changes
1821                            if reason == sort_mode_reason {
1822                                let mode = value.as_i32();
1823                                // Scope the guard so clippy can verify the lock
1824                                // is released before any await.
1825                                let flush_work = {
1826                                    let mut guard = shared.lock();
1827                                    guard.sort_mode = mode;
1828                                    if mode == 0 {
1829                                        let output = guard.flush_sort_buffer();
1830                                        let senders = guard.output.lock().senders_clone();
1831                                        let port = guard.port_handle.clone();
1832                                        sort_flush_active = false;
1833                                        Some((output, senders, port))
1834                                    } else {
1835                                        sort_flush_active = guard.sort_time > 0.0;
1836                                        if sort_flush_active {
1837                                            let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1838                                            sort_flush_interval = tokio::time::interval(dur);
1839                                        }
1840                                        None
1841                                    }
1842                                };
1843                                if let Some((output, senders, port)) = flush_work {
1844                                    output.publish_arrays(&senders).await;
1845                                    output.batch.flush(&port).await;
1846                                }
1847                            }
1848                            if reason == sort_time_reason {
1849                                let t = value.as_f64();
1850                                let mut guard = shared.lock();
1851                                guard.sort_time = t;
1852                                if guard.sort_mode != 0 && t > 0.0 {
1853                                    sort_flush_active = true;
1854                                    let dur = std::time::Duration::from_secs_f64(t);
1855                                    sort_flush_interval = tokio::time::interval(dur);
1856                                } else {
1857                                    sort_flush_active = false;
1858                                }
1859                                drop(guard);
1860                            }
1861                            if reason == sort_size_reason {
1862                                shared.lock().sort_size = value.as_i32();
1863                            }
1864                            // Handle NDArrayPort rewiring — keyed by (port, addr).
1865                            if reason == nd_array_port_reason {
1866                                if let Some(new_port) = value.as_string() {
1867                                    if new_port != current_upstream {
1868                                        let old_key =
1869                                            upstream_key(&current_upstream, current_addr);
1870                                        let new_key = upstream_key(new_port, current_addr);
1871                                        match wiring.rewire_by_name(
1872                                            &sender_port_name,
1873                                            &old_key,
1874                                            &new_key,
1875                                        ) {
1876                                            Ok(()) => current_upstream = new_port.to_string(),
1877                                            Err(e) => {
1878                                                eprintln!("NDArrayPort rewire failed: {e}")
1879                                            }
1880                                        }
1881                                    }
1882                                }
1883                            }
1884                            let snapshot = PluginParamSnapshot {
1885                                enable_callbacks: enabled.load(Ordering::Acquire),
1886                                reason,
1887                                addr,
1888                                value,
1889                            };
1890                            let (process_output, senders, port) = {
1891                                let mut guard = shared.lock();
1892                                let t0 = std::time::Instant::now();
1893                                let result = guard.processor.on_param_change(reason, &snapshot);
1894                                let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1895                                let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1896                                    Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
1897                                } else {
1898                                    None
1899                                };
1900                                let senders = guard.output.lock().senders_clone();
1901                                (output, senders, guard.port_handle.clone())
1902                            };
1903                            if let Some(po) = process_output {
1904                                po.publish_arrays(&senders).await;
1905                                po.batch.flush(&port).await;
1906                            }
1907                        }
1908                        None => break,
1909                    }
1910                }
1911                _ = sort_flush_interval.tick(), if sort_flush_active => {
1912                    // B3: drain head-first while contiguous or past the
1913                    // staleness deadline — NOT the whole buffer.
1914                    let (output, senders, port) = {
1915                        let mut guard = shared.lock();
1916                        let output = guard.tick_sort_buffer();
1917                        let senders = guard.output.lock().senders_clone();
1918                        let port = guard.port_handle.clone();
1919                        (output, senders, port)
1920                    };
1921                    output.publish_arrays(&senders).await;
1922                    output.batch.flush(&port).await;
1923                }
1924            }
1925        }
1926    });
1927}
1928
1929/// Connect a downstream plugin's sender to a plugin runtime's output.
1930///
1931/// B13: the upstream's `array_output` is the same `Arc` that every
1932/// `create_plugin_runtime*` entry point registers in the `WiringRegistry`, so
1933/// adding a sender here mutates the registry-tracked output — the registry
1934/// remains the single source of truth for `rewire_by_name`.
1935pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1936    upstream.array_output().lock().add(downstream_sender);
1937}
1938
1939/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
1940pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1941    port_name: &str,
1942    mut processor: P,
1943    pool: Arc<NDArrayPool>,
1944    queue_size: usize,
1945    output: NDArrayOutput,
1946    ndarray_port: &str,
1947    wiring: Arc<WiringRegistry>,
1948) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1949    // B14: unbounded so control-plane param changes (e.g. autosave restoring
1950    // hundreds of PVs at IOC init) are never silently dropped before the
1951    // data plane sees them.
1952    let (param_tx, param_rx) =
1953        tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
1954
1955    let plugin_type_name = processor.plugin_type().to_string();
1956    let compression_aware = processor.compression_aware();
1957    let array_data = processor.array_data_handle();
1958    let driver = PluginPortDriver::new(
1959        port_name,
1960        &plugin_type_name,
1961        queue_size,
1962        ndarray_port,
1963        1,
1964        param_tx,
1965        &mut processor,
1966        array_data,
1967    )
1968    .expect("failed to create plugin port driver");
1969
1970    let ndarray_params = driver.ndarray_params;
1971    let plugin_params = driver.plugin_params;
1972    let std_array_data_param = driver.std_array_data_param;
1973
1974    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1975
1976    let port_handle = port_runtime.port_handle().clone();
1977
1978    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1979
1980    let enabled = Arc::new(AtomicBool::new(false));
1981    let blocking_mode = Arc::new(AtomicBool::new(false));
1982
1983    let array_output = Arc::new(parking_lot::Mutex::new(output));
1984    let array_output_for_handle = array_output.clone();
1985    // B13: register this plugin's output so the WiringRegistry is the single
1986    // source of truth — an output created via this entry point is otherwise
1987    // invisible to runtime rewiring.
1988    wiring.register_output(port_name, array_output.clone());
1989    // G1/B1: DroppedArrays counter shared with upstream senders.
1990    let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
1991    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1992        processor,
1993        output: array_output,
1994        pool,
1995        ndarray_params,
1996        plugin_params,
1997        port_handle,
1998        array_counter: 0,
1999        std_array_data_param,
2000        min_callback_time: 0.0,
2001        last_process_time: None,
2002        sort_mode: 0,
2003        sort_time: 0.0,
2004        sort_size: 10,
2005        sort_buffer: SortBuffer::new(),
2006        dropped_arrays: dropped_arrays_counter,
2007        compression_aware,
2008        max_byte_rate: 0.0,
2009        throttler: super::throttler::Throttler::new(0.0),
2010        prev_input_array: None,
2011        dims_prev: Vec::new(),
2012        nd_array_addr: 0,
2013        max_threads: 1,
2014        num_threads: 1,
2015    }));
2016
2017    let data_enabled = enabled.clone();
2018    let data_blocking = blocking_mode.clone();
2019
2020    let mut array_sender = array_sender;
2021    array_sender.set_mode_flags(enabled, blocking_mode);
2022
2023    // Capture wiring info for data loop
2024    let sender_port_name = port_name.to_string();
2025    let initial_upstream = ndarray_port.to_string();
2026
2027    let data_jh = thread::Builder::new()
2028        .name(format!("plugin-data-{port_name}"))
2029        .spawn(move || {
2030            plugin_data_loop(
2031                shared,
2032                array_rx,
2033                param_rx,
2034                plugin_params,
2035                ndarray_params.array_counter,
2036                data_enabled,
2037                data_blocking,
2038                sender_port_name,
2039                initial_upstream,
2040                wiring,
2041            );
2042        })
2043        .expect("failed to spawn plugin data thread");
2044
2045    let handle = PluginRuntimeHandle {
2046        port_runtime,
2047        array_sender,
2048        array_output: array_output_for_handle,
2049        port_name: port_name.to_string(),
2050        ndarray_params,
2051        plugin_params,
2052    };
2053
2054    (handle, data_jh)
2055}
2056
2057#[cfg(test)]
2058mod tests {
2059    use super::*;
2060    use crate::ndarray::{NDDataType, NDDimension};
2061    use crate::plugin::channel::ndarray_channel;
2062
2063    /// Passthrough processor: returns the input array as-is.
2064    struct PassthroughProcessor;
2065
2066    impl NDPluginProcess for PassthroughProcessor {
2067        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2068            ProcessResult::arrays(vec![Arc::new(array.clone())])
2069        }
2070        fn plugin_type(&self) -> &str {
2071            "Passthrough"
2072        }
2073    }
2074
2075    /// Sink processor: consumes arrays, returns nothing.
2076    struct SinkProcessor {
2077        count: usize,
2078    }
2079
2080    impl NDPluginProcess for SinkProcessor {
2081        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2082            self.count += 1;
2083            ProcessResult::empty()
2084        }
2085        fn plugin_type(&self) -> &str {
2086            "Sink"
2087        }
2088    }
2089
2090    fn make_test_array(id: i32) -> Arc<NDArray> {
2091        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
2092        arr.unique_id = id;
2093        Arc::new(arr)
2094    }
2095
2096    fn test_wiring() -> Arc<WiringRegistry> {
2097        Arc::new(WiringRegistry::new())
2098    }
2099
2100    /// Enable callbacks on a plugin handle (plugins default to disabled).
2101    fn enable_callbacks(handle: &PluginRuntimeHandle) {
2102        handle
2103            .port_runtime()
2104            .port_handle()
2105            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
2106            .unwrap();
2107        std::thread::sleep(std::time::Duration::from_millis(10));
2108    }
2109
2110    /// Send an array via the sender from a sync test context.
2111    /// Uses a dedicated thread with a current-thread runtime to avoid
2112    /// interfering with the plugin's own runtime.
2113    fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
2114        let sender = sender.clone();
2115        let jh = std::thread::spawn(move || {
2116            let rt = tokio::runtime::Builder::new_current_thread()
2117                .enable_all()
2118                .build()
2119                .unwrap();
2120            rt.block_on(sender.publish(array));
2121        });
2122        jh.join().unwrap();
2123    }
2124
2125    #[test]
2126    fn test_passthrough_runtime() {
2127        let pool = Arc::new(NDArrayPool::new(1_000_000));
2128
2129        // Create downstream receiver
2130        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2131        let mut output = NDArrayOutput::new();
2132        output.add(downstream_sender);
2133
2134        let (handle, _data_jh) = create_plugin_runtime_with_output(
2135            "PASS1",
2136            PassthroughProcessor,
2137            pool,
2138            10,
2139            output,
2140            "",
2141            test_wiring(),
2142        );
2143        enable_callbacks(&handle);
2144
2145        // Send an array
2146        send_array(handle.array_sender(), make_test_array(42));
2147
2148        // Should come out the other side
2149        let received = downstream_rx.blocking_recv().unwrap();
2150        assert_eq!(received.unique_id, 42);
2151    }
2152
2153    #[test]
2154    fn test_sink_runtime() {
2155        let pool = Arc::new(NDArrayPool::new(1_000_000));
2156
2157        let (handle, _data_jh) = create_plugin_runtime(
2158            "SINK1",
2159            SinkProcessor { count: 0 },
2160            pool,
2161            10,
2162            "",
2163            test_wiring(),
2164        );
2165        enable_callbacks(&handle);
2166
2167        // Send arrays - they should be consumed silently
2168        send_array(handle.array_sender(), make_test_array(1));
2169        send_array(handle.array_sender(), make_test_array(2));
2170
2171        // Give processing thread time
2172        std::thread::sleep(std::time::Duration::from_millis(50));
2173
2174        // No crash, no output needed
2175        assert_eq!(handle.port_name(), "SINK1");
2176    }
2177
2178    #[test]
2179    fn test_plugin_type_param() {
2180        let pool = Arc::new(NDArrayPool::new(1_000_000));
2181
2182        let (handle, _data_jh) = create_plugin_runtime(
2183            "TYPE_TEST",
2184            PassthroughProcessor,
2185            pool,
2186            10,
2187            "",
2188            test_wiring(),
2189        );
2190
2191        // Verify port name
2192        assert_eq!(handle.port_name(), "TYPE_TEST");
2193        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
2194    }
2195
2196    #[test]
2197    fn test_shutdown_on_handle_drop() {
2198        let pool = Arc::new(NDArrayPool::new(1_000_000));
2199
2200        let (handle, data_jh) = create_plugin_runtime(
2201            "SHUTDOWN_TEST",
2202            PassthroughProcessor,
2203            pool,
2204            10,
2205            "",
2206            test_wiring(),
2207        );
2208
2209        // Drop the handle (closes sender channel, which should cause data thread to exit)
2210        let sender = handle.array_sender().clone();
2211        drop(handle);
2212        drop(sender);
2213
2214        // Data thread should terminate
2215        let result = data_jh.join();
2216        assert!(result.is_ok());
2217    }
2218
2219    #[test]
2220    fn test_wire_to_nonzero_ndarray_addr() {
2221        // G6: a multi-address upstream plugin registers its output under every
2222        // address in 0..max_addr. A downstream consumer must be able to select
2223        // NDArrayAddr=1 and actually receive arrays — previously the output was
2224        // registered under the bare port name only, so the "PORT:1" key was
2225        // missing and rewire failed with "not found".
2226        use crate::plugin::wiring::upstream_key;
2227        let pool = Arc::new(NDArrayPool::new(1_000_000));
2228        let wiring = test_wiring();
2229
2230        // Upstream plugin advertises 2 addresses.
2231        let (up_handle, _up_jh) = create_plugin_runtime_multi_addr(
2232            "UP_MULTI",
2233            PassthroughProcessor,
2234            pool,
2235            10,
2236            "",
2237            wiring.clone(),
2238            2,
2239        );
2240        enable_callbacks(&up_handle);
2241
2242        // The "PORT:1" key must resolve to the same output as the bare port.
2243        let addr0 = wiring.lookup_output("UP_MULTI");
2244        let addr1 = wiring.lookup_output(&upstream_key("UP_MULTI", 1));
2245        assert!(addr0.is_some(), "addr 0 output must be registered");
2246        assert!(
2247            addr1.is_some(),
2248            "addr 1 output must be registered for a max_addr=2 port"
2249        );
2250
2251        // Wire a downstream consumer to UP_MULTI address 1.
2252        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWN_ADDR1", 10);
2253        wiring
2254            .rewire(&downstream_sender, "", &upstream_key("UP_MULTI", 1))
2255            .expect("wiring a consumer to NDArrayAddr=1 must succeed");
2256
2257        // An array sent through the upstream must reach the addr-1 consumer.
2258        send_array(up_handle.array_sender(), make_test_array(99));
2259        let received = downstream_rx.blocking_recv().unwrap();
2260        assert_eq!(
2261            received.unique_id, 99,
2262            "consumer wired to NDArrayAddr=1 must receive upstream arrays"
2263        );
2264    }
2265
2266    #[test]
2267    fn test_nonblocking_passthrough() {
2268        let pool = Arc::new(NDArrayPool::new(1_000_000));
2269        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2270        let mut output = NDArrayOutput::new();
2271        output.add(downstream_sender);
2272
2273        let (handle, _data_jh) = create_plugin_runtime_with_output(
2274            "NB_TEST",
2275            PassthroughProcessor,
2276            pool,
2277            10,
2278            output,
2279            "",
2280            test_wiring(),
2281        );
2282        enable_callbacks(&handle);
2283
2284        send_array(handle.array_sender(), make_test_array(42));
2285
2286        let received = downstream_rx.blocking_recv().unwrap();
2287        assert_eq!(received.unique_id, 42);
2288    }
2289
2290    #[test]
2291    fn test_blocking_to_nonblocking_switch() {
2292        let pool = Arc::new(NDArrayPool::new(1_000_000));
2293        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2294        let mut output = NDArrayOutput::new();
2295        output.add(downstream_sender);
2296
2297        let (handle, _data_jh) = create_plugin_runtime_with_output(
2298            "SWITCH_TEST",
2299            PassthroughProcessor,
2300            pool,
2301            10,
2302            output,
2303            "",
2304            test_wiring(),
2305        );
2306        enable_callbacks(&handle);
2307
2308        // Start in blocking mode
2309        handle
2310            .port_runtime()
2311            .port_handle()
2312            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
2313            .unwrap();
2314        std::thread::sleep(std::time::Duration::from_millis(50));
2315
2316        send_array(handle.array_sender(), make_test_array(1));
2317        let received = downstream_rx.blocking_recv().unwrap();
2318        assert_eq!(received.unique_id, 1);
2319
2320        // Switch back to non-blocking
2321        handle
2322            .port_runtime()
2323            .port_handle()
2324            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
2325            .unwrap();
2326        std::thread::sleep(std::time::Duration::from_millis(50));
2327
2328        // Send in non-blocking mode — goes through channel to data thread
2329        send_array(handle.array_sender(), make_test_array(2));
2330        let received = downstream_rx.blocking_recv().unwrap();
2331        assert_eq!(received.unique_id, 2);
2332    }
2333
2334    #[test]
2335    fn test_enable_callbacks_disables_processing() {
2336        let pool = Arc::new(NDArrayPool::new(1_000_000));
2337        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2338        let mut output = NDArrayOutput::new();
2339        output.add(downstream_sender);
2340
2341        let (handle, _data_jh) = create_plugin_runtime_with_output(
2342            "ENABLE_TEST",
2343            PassthroughProcessor,
2344            pool,
2345            10,
2346            output,
2347            "",
2348            test_wiring(),
2349        );
2350
2351        // Disable callbacks
2352        handle
2353            .port_runtime()
2354            .port_handle()
2355            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
2356            .unwrap();
2357        std::thread::sleep(std::time::Duration::from_millis(50));
2358
2359        // Send array — should be silently dropped by sender (callbacks disabled)
2360        send_array(handle.array_sender(), make_test_array(99));
2361
2362        // Verify nothing received (with timeout)
2363        let rt = tokio::runtime::Builder::new_current_thread()
2364            .enable_all()
2365            .build()
2366            .unwrap();
2367        let result = rt.block_on(async {
2368            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
2369        });
2370        assert!(
2371            result.is_err(),
2372            "should not receive array when callbacks disabled"
2373        );
2374    }
2375
2376    #[test]
2377    fn test_downstream_receives_multiple() {
2378        let pool = Arc::new(NDArrayPool::new(1_000_000));
2379
2380        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
2381        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
2382        let mut output = NDArrayOutput::new();
2383        output.add(ds1);
2384        output.add(ds2);
2385
2386        let (handle, _data_jh) = create_plugin_runtime_with_output(
2387            "DS_TEST",
2388            PassthroughProcessor,
2389            pool,
2390            10,
2391            output,
2392            "",
2393            test_wiring(),
2394        );
2395        enable_callbacks(&handle);
2396
2397        send_array(handle.array_sender(), make_test_array(77));
2398
2399        // Both downstream receivers should have the array
2400        let r1 = rx1.blocking_recv().unwrap();
2401        let r2 = rx2.blocking_recv().unwrap();
2402        assert_eq!(r1.unique_id, 77);
2403        assert_eq!(r2.unique_id, 77);
2404    }
2405
2406    #[test]
2407    fn test_param_updates_after_send() {
2408        let pool = Arc::new(NDArrayPool::new(1_000_000));
2409
2410        struct ParamTracker;
2411        impl NDPluginProcess for ParamTracker {
2412            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2413                ProcessResult::arrays(vec![Arc::new(array.clone())])
2414            }
2415            fn plugin_type(&self) -> &str {
2416                "ParamTracker"
2417            }
2418        }
2419
2420        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2421        let mut output = NDArrayOutput::new();
2422        output.add(downstream_sender);
2423
2424        let (handle, _data_jh) = create_plugin_runtime_with_output(
2425            "PARAM_TEST",
2426            ParamTracker,
2427            pool,
2428            10,
2429            output,
2430            "",
2431            test_wiring(),
2432        );
2433        enable_callbacks(&handle);
2434
2435        // Send array
2436        send_array(handle.array_sender(), make_test_array(1));
2437        let received = downstream_rx.blocking_recv().unwrap();
2438        assert_eq!(received.unique_id, 1);
2439
2440        // Write enable_callbacks — should not crash
2441        handle
2442            .port_runtime()
2443            .port_handle()
2444            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
2445            .unwrap();
2446        std::thread::sleep(std::time::Duration::from_millis(50));
2447
2448        // Still works after param update
2449        send_array(handle.array_sender(), make_test_array(2));
2450        let received = downstream_rx.blocking_recv().unwrap();
2451        assert_eq!(received.unique_id, 2);
2452    }
2453
2454    #[test]
2455    fn test_sort_buffer_reorders_by_unique_id() {
2456        let mut buf = SortBuffer::new();
2457
2458        // Insert out of order: 3, 1, 2
2459        buf.insert(3, vec![make_test_array(3)], 10);
2460        buf.insert(1, vec![make_test_array(1)], 10);
2461        buf.insert(2, vec![make_test_array(2)], 10);
2462
2463        assert_eq!(buf.len(), 3);
2464
2465        let drained = buf.drain_all();
2466        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2467        assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
2468        assert_eq!(buf.len(), 0);
2469        assert_eq!(buf.prev_unique_id, 3);
2470    }
2471
2472    #[test]
2473    fn test_sort_buffer_drain_ready_contiguous() {
2474        // B3: drain_ready releases the head while the next-expected uniqueId
2475        // is contiguous, even when later ids are still missing.
2476        let mut buf = SortBuffer::new();
2477        // Mark a prior emission (prev=0) so the contiguity path is active;
2478        // C++ only uses the deadline for the very first output array.
2479        buf.note_emitted(0);
2480        buf.insert(1, vec![make_test_array(1)], 10);
2481        buf.insert(2, vec![make_test_array(2)], 10);
2482        buf.insert(5, vec![make_test_array(5)], 10); // gap: 3,4 missing
2483
2484        // sort_time large → only contiguity drives release.
2485        let drained = buf.drain_ready(100.0);
2486        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2487        assert_eq!(ids, vec![1, 2], "contiguous run released; id=5 held by gap");
2488        assert_eq!(buf.len(), 1);
2489    }
2490
2491    #[test]
2492    fn test_sort_buffer_drain_ready_deadline() {
2493        // B3: a stale head is released past sort_time even with a gap.
2494        let mut buf = SortBuffer::new();
2495        buf.note_emitted(1); // prev=1
2496        buf.insert(5, vec![make_test_array(5)], 10); // out of order
2497        std::thread::sleep(std::time::Duration::from_millis(30));
2498        // sort_time=0.01s → head aged past deadline → released.
2499        let drained = buf.drain_ready(0.01);
2500        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2501        assert_eq!(ids, vec![5], "stale head released via deadline");
2502    }
2503
2504    #[test]
2505    fn test_sort_buffer_detects_disordered_on_emit() {
2506        // B4: disorder is counted at emission time.
2507        let mut buf = SortBuffer::new();
2508        buf.note_emitted(5); // prev=5, first_output now false
2509        buf.note_emitted(3); // 3 != 5 and != 6 → disordered
2510        assert_eq!(buf.disordered_arrays, 1);
2511        buf.note_emitted(4); // 4 != 3 and != 4? 4 == prev+1 → ordered
2512        assert_eq!(buf.disordered_arrays, 1);
2513    }
2514
2515    #[test]
2516    fn test_sort_buffer_drops_when_full() {
2517        let mut buf = SortBuffer::new();
2518
2519        // sort_size=2: third insert is refused.
2520        assert!(buf.insert(1, vec![make_test_array(1)], 2));
2521        assert!(buf.insert(2, vec![make_test_array(2)], 2));
2522        assert!(!buf.insert(3, vec![make_test_array(3)], 2));
2523
2524        assert_eq!(buf.len(), 2);
2525        assert_eq!(buf.dropped_output_arrays, 1);
2526    }
2527
2528    #[test]
2529    fn test_sort_mode_runtime_integration() {
2530        let pool = Arc::new(NDArrayPool::new(1_000_000));
2531        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2532        let mut output = NDArrayOutput::new();
2533        output.add(downstream_sender);
2534
2535        let (handle, _data_jh) = create_plugin_runtime_with_output(
2536            "SORT_TEST",
2537            PassthroughProcessor,
2538            pool,
2539            10,
2540            output,
2541            "",
2542            test_wiring(),
2543        );
2544        enable_callbacks(&handle);
2545
2546        // Enable sort mode with sort_size=10 and a sort_time deadline.
2547        handle
2548            .port_runtime()
2549            .port_handle()
2550            .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
2551            .unwrap();
2552        handle
2553            .port_runtime()
2554            .port_handle()
2555            .write_float64_blocking(handle.plugin_params.sort_time, 0, 0.1)
2556            .unwrap();
2557        handle
2558            .port_runtime()
2559            .port_handle()
2560            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
2561            .unwrap();
2562        std::thread::sleep(std::time::Duration::from_millis(50));
2563
2564        // B2: in-order arrays (1,2,3) must be emitted IMMEDIATELY via the
2565        // fast path — they are NOT delayed by the sort buffer.
2566        send_array(handle.array_sender(), make_test_array(1));
2567        send_array(handle.array_sender(), make_test_array(2));
2568        send_array(handle.array_sender(), make_test_array(3));
2569
2570        let rt = tokio::runtime::Builder::new_current_thread()
2571            .enable_all()
2572            .build()
2573            .unwrap();
2574        let fast = rt.block_on(async {
2575            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
2576        });
2577        assert!(
2578            fast.is_ok(),
2579            "in-order arrays must be emitted immediately, not buffered"
2580        );
2581        assert_eq!(fast.unwrap().unwrap().unique_id, 1);
2582        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 2);
2583        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 3);
2584
2585        // B3: now send out of order (5 before 4). prev=3, so 4 is in-order
2586        // and emitted immediately; 5 arrives first, is buffered, then 4
2587        // unblocks it.
2588        send_array(handle.array_sender(), make_test_array(5));
2589        send_array(handle.array_sender(), make_test_array(4));
2590        std::thread::sleep(std::time::Duration::from_millis(50));
2591        // 4 emitted immediately (in order), then 5 released by contiguity.
2592        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 4);
2593        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 5);
2594    }
2595
2596    #[test]
2597    fn test_throttle_drops_output_arrays() {
2598        // G7: with a tiny MaxByteRate, output arrays exceeding the byte budget
2599        // are dropped and counted into DroppedOutputArrays.
2600        let pool = Arc::new(NDArrayPool::new(1_000_000));
2601        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2602        let mut output = NDArrayOutput::new();
2603        output.add(downstream_sender);
2604
2605        let (handle, _data_jh) = create_plugin_runtime_with_output(
2606            "THROTTLE_TEST",
2607            PassthroughProcessor,
2608            pool,
2609            10,
2610            output,
2611            "",
2612            test_wiring(),
2613        );
2614        enable_callbacks(&handle);
2615
2616        // MaxByteRate = 8 bytes/sec. Each test array is 4 bytes; the bucket
2617        // starts full at 8, so the first two pass and the rest are dropped.
2618        handle
2619            .port_runtime()
2620            .port_handle()
2621            .write_float64_blocking(handle.plugin_params.max_byte_rate, 0, 8.0)
2622            .unwrap();
2623        std::thread::sleep(std::time::Duration::from_millis(20));
2624
2625        for id in 1..=5 {
2626            send_array(handle.array_sender(), make_test_array(id));
2627        }
2628        std::thread::sleep(std::time::Duration::from_millis(50));
2629
2630        // Drain whatever made it through — strictly fewer than 5.
2631        let rt = tokio::runtime::Builder::new_current_thread()
2632            .enable_all()
2633            .build()
2634            .unwrap();
2635        let mut received = 0;
2636        while rt
2637            .block_on(async {
2638                tokio::time::timeout(std::time::Duration::from_millis(20), downstream_rx.recv())
2639                    .await
2640            })
2641            .map(|o| o.is_some())
2642            .unwrap_or(false)
2643        {
2644            received += 1;
2645        }
2646        assert!(
2647            received < 5,
2648            "throttle must drop some arrays (got {received})"
2649        );
2650        assert!(received >= 1, "first array within budget must pass");
2651    }
2652
2653    #[test]
2654    fn test_process_plugin_reprocesses_last_input() {
2655        // G5: writing ProcessPlugin re-injects the cached last input array.
2656        let pool = Arc::new(NDArrayPool::new(1_000_000));
2657        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2658        let mut output = NDArrayOutput::new();
2659        output.add(downstream_sender);
2660
2661        let (handle, _data_jh) = create_plugin_runtime_with_output(
2662            "PROCESS_PLUGIN_TEST",
2663            PassthroughProcessor,
2664            pool,
2665            10,
2666            output,
2667            "",
2668            test_wiring(),
2669        );
2670        enable_callbacks(&handle);
2671
2672        send_array(handle.array_sender(), make_test_array(7));
2673        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 7);
2674
2675        // Trigger ProcessPlugin — the cached input (id=7) is reprocessed.
2676        handle
2677            .port_runtime()
2678            .port_handle()
2679            .write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
2680            .unwrap();
2681        let reprocessed = downstream_rx.blocking_recv().unwrap();
2682        assert_eq!(
2683            reprocessed.unique_id, 7,
2684            "ProcessPlugin re-emits last input"
2685        );
2686    }
2687
2688    #[test]
2689    fn test_min_callback_time_drop_counts() {
2690        // B5: a MinCallbackTime-throttled array is dropped — verify the data
2691        // loop does not emit it (silent loss is the bug being fixed; the
2692        // DroppedArrays param increment is covered by the integration tests).
2693        let pool = Arc::new(NDArrayPool::new(1_000_000));
2694        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2695        let mut output = NDArrayOutput::new();
2696        output.add(downstream_sender);
2697
2698        let (handle, _data_jh) = create_plugin_runtime_with_output(
2699            "MIN_CB_TEST",
2700            PassthroughProcessor,
2701            pool,
2702            10,
2703            output,
2704            "",
2705            test_wiring(),
2706        );
2707        enable_callbacks(&handle);
2708
2709        // 10s minimum between callbacks — only the first array gets through.
2710        handle
2711            .port_runtime()
2712            .port_handle()
2713            .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
2714            .unwrap();
2715        std::thread::sleep(std::time::Duration::from_millis(20));
2716
2717        send_array(handle.array_sender(), make_test_array(1));
2718        send_array(handle.array_sender(), make_test_array(2));
2719        std::thread::sleep(std::time::Duration::from_millis(50));
2720
2721        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
2722        let rt = tokio::runtime::Builder::new_current_thread()
2723            .enable_all()
2724            .build()
2725            .unwrap();
2726        let second = rt.block_on(async {
2727            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
2728        });
2729        assert!(
2730            second.is_err(),
2731            "second array throttled out by MinCallbackTime"
2732        );
2733    }
2734
2735    #[test]
2736    fn test_process_plugin_skips_throttled_input() {
2737        // a MinCallbackTime-throttled frame must NOT be cached as the
2738        // ProcessPlugin input. After array 1 is processed and array 2 is
2739        // dropped by the throttle, ProcessPlugin must re-inject array 1
2740        // (the last *processed* array), not the dropped array 2.
2741        let pool = Arc::new(NDArrayPool::new(1_000_000));
2742        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2743        let mut output = NDArrayOutput::new();
2744        output.add(downstream_sender);
2745
2746        let (handle, _data_jh) = create_plugin_runtime_with_output(
2747            "PROCESS_THROTTLE_TEST",
2748            PassthroughProcessor,
2749            pool,
2750            10,
2751            output,
2752            "",
2753            test_wiring(),
2754        );
2755        enable_callbacks(&handle);
2756
2757        // 10s minimum between callbacks — only the first array is processed.
2758        handle
2759            .port_runtime()
2760            .port_handle()
2761            .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
2762            .unwrap();
2763        std::thread::sleep(std::time::Duration::from_millis(20));
2764
2765        send_array(handle.array_sender(), make_test_array(1));
2766        send_array(handle.array_sender(), make_test_array(2));
2767        std::thread::sleep(std::time::Duration::from_millis(50));
2768
2769        // Array 1 was processed and emitted; array 2 was throttled out.
2770        assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
2771
2772        // ProcessPlugin re-injects the cached input. The cache must still hold
2773        // array 1, because array 2 never passed the throttle gate. The
2774        // re-injected array itself is also subject to the throttle, so reset
2775        // MinCallbackTime to 0 first so the re-injected frame is processed.
2776        handle
2777            .port_runtime()
2778            .port_handle()
2779            .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 0.0)
2780            .unwrap();
2781        std::thread::sleep(std::time::Duration::from_millis(20));
2782        handle
2783            .port_runtime()
2784            .port_handle()
2785            .write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
2786            .unwrap();
2787        let reprocessed = downstream_rx.blocking_recv().unwrap();
2788        assert_eq!(
2789            reprocessed.unique_id, 1,
2790            "ProcessPlugin must re-inject the last processed array (1), not the throttled array (2)"
2791        );
2792    }
2793
2794    #[test]
2795    fn test_g3_compressed_array_dropped_on_non_aware_plugin() {
2796        // G3: a non-compression-aware plugin drops a compressed array.
2797        let pool = Arc::new(NDArrayPool::new(1_000_000));
2798        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2799        let mut output = NDArrayOutput::new();
2800        output.add(downstream_sender);
2801
2802        let (handle, _data_jh) = create_plugin_runtime_with_output(
2803            "G3_TEST",
2804            PassthroughProcessor, // compression_aware() defaults to false
2805            pool,
2806            10,
2807            output,
2808            "",
2809            test_wiring(),
2810        );
2811        enable_callbacks(&handle);
2812
2813        // A compressed array must be dropped, not forwarded.
2814        let mut compressed = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
2815        compressed.unique_id = 1;
2816        compressed.codec = Some(crate::codec::Codec {
2817            name: crate::codec::CodecName::JPEG,
2818            compressed_size: 16,
2819            level: 0,
2820            shuffle: 0,
2821            compressor: 0,
2822            original_data_type: NDDataType::UInt8,
2823        });
2824        send_array(handle.array_sender(), Arc::new(compressed));
2825
2826        // An uncompressed array passes through normally.
2827        send_array(handle.array_sender(), make_test_array(2));
2828
2829        let r = downstream_rx.blocking_recv().unwrap();
2830        assert_eq!(
2831            r.unique_id, 2,
2832            "compressed array dropped; only the raw array reaches downstream"
2833        );
2834    }
2835
2836    #[test]
2837    fn test_drop_on_full_increments_dropped_counter() {
2838        // B1/G1: a slow downstream plugin with a tiny input queue drops arrays
2839        // when the queue is full; the drop is counted in the plugin's shared
2840        // DroppedArrays counter rather than back-pressuring the producer.
2841        struct SlowProcessor;
2842        impl NDPluginProcess for SlowProcessor {
2843            fn process_array(&mut self, _a: &NDArray, _p: &NDArrayPool) -> ProcessResult {
2844                std::thread::sleep(std::time::Duration::from_millis(200));
2845                ProcessResult::empty()
2846            }
2847            fn plugin_type(&self) -> &str {
2848                "Slow"
2849            }
2850        }
2851        let pool = Arc::new(NDArrayPool::new(1_000_000));
2852
2853        // Downstream plugin with queue size 1 and a slow processor.
2854        let (downstream_handle, _ds_jh) =
2855            create_plugin_runtime("B1_DOWNSTREAM", SlowProcessor, pool, 1, "", test_wiring());
2856        enable_callbacks(&downstream_handle);
2857        let ds_sender = downstream_handle.array_sender().clone();
2858        let dropped = ds_sender.dropped_arrays_counter().clone();
2859
2860        // First array is taken by the data loop (now sleeping 200ms); second
2861        // fills the 1-slot queue; the rest find a full queue → dropped.
2862        send_array(&ds_sender, make_test_array(1));
2863        send_array(&ds_sender, make_test_array(2));
2864        send_array(&ds_sender, make_test_array(3));
2865        send_array(&ds_sender, make_test_array(4));
2866
2867        assert!(
2868            dropped.load(Ordering::Acquire) >= 1,
2869            "arrays dropped on a full queue must be counted (got {})",
2870            dropped.load(Ordering::Acquire)
2871        );
2872    }
2873
2874    #[test]
2875    fn test_cross_width_narrowing_array_read_truncates() {
2876        // Cross-width integer narrowing array reads must TRUNCATE (wrapping),
2877        // matching the C cast in C++ NDArrayPool.cpp:388 `convertType`
2878        //   *pDataOut++ = (dataTypeOut)(*pDataIn++);
2879        // A C cast `(epicsInt8)(epicsUInt16)300` keeps the low 8 bits == 44.
2880        // The f64 round-trip in copy_convert would SATURATE (`300.0 as i8`
2881        // == 127) and diverge from C++ — copy_ccast must be used instead.
2882
2883        // U16 -> i8: 300 = 0x012C; low byte 0x2C = 44.
2884        let mut out = [0i8; 1];
2885        let n = copy_ccast(&[300u16], &mut out);
2886        assert_eq!(n, 1);
2887        assert_eq!(out[0], 44, "(epicsInt8)(epicsUInt16)300 == 44 (low 8 bits)");
2888        // copy_convert would have saturated:
2889        let mut sat = [0i8; 1];
2890        copy_convert(&[300u16], &mut sat);
2891        assert_eq!(sat[0], 127, "f64 round-trip saturates — the wrong behavior");
2892
2893        // I32 -> i8: 0x1234_5678 -> low byte 0x78 = 120.
2894        let mut out2 = [0i8; 1];
2895        copy_ccast(&[0x1234_5678i32], &mut out2);
2896        assert_eq!(out2[0], 0x78);
2897
2898        // I32 -> i8: -1 stays -1 (all-ones low byte).
2899        let mut out3 = [0i8; 1];
2900        copy_ccast(&[-1i32], &mut out3);
2901        assert_eq!(out3[0], -1);
2902
2903        // U16 -> i8: 0x00FF = 255 -> low byte 0xFF reinterpreted as i8 == -1.
2904        let mut out4 = [0i8; 1];
2905        copy_ccast(&[255u16], &mut out4);
2906        assert_eq!(out4[0], -1);
2907
2908        // I64 -> i32: 0x0000_0001_0000_002A -> low 32 bits == 42.
2909        let mut out5 = [0i32; 1];
2910        copy_ccast(&[0x0000_0001_0000_002Ai64], &mut out5);
2911        assert_eq!(out5[0], 42);
2912
2913        // U32 -> i16: 70000 = 0x0001_1170 -> low 16 bits 0x1170 == 4464.
2914        let mut out6 = [0i16; 1];
2915        copy_ccast(&[70000u32], &mut out6);
2916        assert_eq!(out6[0], 4464);
2917
2918        // Same-width sign change still works as a bitwise reinterpret:
2919        // U8 255 -> i8 -1.
2920        let mut out7 = [0i8; 1];
2921        copy_ccast(&[255u8], &mut out7);
2922        assert_eq!(out7[0], -1);
2923
2924        // F64 out-of-range -> i32 still routes through copy_convert (the
2925        // `convert:` arm for float sources). C++ converts float->int with a
2926        // C cast too, but the runtime keeps the f64 numeric path for float
2927        // sources; this asserts the integer-narrowing fix did not change the
2928        // float-source path.
2929        let mut fout = [0i32; 1];
2930        copy_convert(&[42.9f64], &mut fout);
2931        assert_eq!(fout[0], 42, "f64 -> i32 truncates toward zero");
2932    }
2933}