Skip to main content

ad_core_rs/plugin/
runtime.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
19use super::params::PluginBaseParams;
20use super::wiring::WiringRegistry;
21
22/// Value sent through the param change channel from control plane to data plane.
23#[derive(Debug, Clone)]
24pub enum ParamChangeValue {
25    Int32(i32),
26    Float64(f64),
27    Octet(String),
28}
29
30impl ParamChangeValue {
31    pub fn as_i32(&self) -> i32 {
32        match self {
33            ParamChangeValue::Int32(v) => *v,
34            ParamChangeValue::Float64(v) => *v as i32,
35            ParamChangeValue::Octet(_) => 0,
36        }
37    }
38
39    pub fn as_f64(&self) -> f64 {
40        match self {
41            ParamChangeValue::Int32(v) => *v as f64,
42            ParamChangeValue::Float64(v) => *v,
43            ParamChangeValue::Octet(_) => 0.0,
44        }
45    }
46
47    pub fn as_string(&self) -> Option<&str> {
48        match self {
49            ParamChangeValue::Octet(s) => Some(s),
50            _ => None,
51        }
52    }
53}
54
55/// A single parameter update produced by a plugin's process_array.
56pub enum ParamUpdate {
57    Int32 {
58        reason: usize,
59        addr: i32,
60        value: i32,
61    },
62    Float64 {
63        reason: usize,
64        addr: i32,
65        value: f64,
66    },
67    Octet {
68        reason: usize,
69        addr: i32,
70        value: String,
71    },
72    Float64Array {
73        reason: usize,
74        addr: i32,
75        value: Vec<f64>,
76    },
77}
78
79impl ParamUpdate {
80    /// Create an Int32 update at addr 0.
81    pub fn int32(reason: usize, value: i32) -> Self {
82        Self::Int32 {
83            reason,
84            addr: 0,
85            value,
86        }
87    }
88    /// Create a Float64 update at addr 0.
89    pub fn float64(reason: usize, value: f64) -> Self {
90        Self::Float64 {
91            reason,
92            addr: 0,
93            value,
94        }
95    }
96    /// Create an Int32 update at a specific addr.
97    pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
98        Self::Int32 {
99            reason,
100            addr,
101            value,
102        }
103    }
104    /// Create a Float64 update at a specific addr.
105    pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
106        Self::Float64 {
107            reason,
108            addr,
109            value,
110        }
111    }
112    /// Create a Float64Array update at addr 0.
113    pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
114        Self::Float64Array {
115            reason,
116            addr: 0,
117            value,
118        }
119    }
120    /// Create a Float64Array update at a specific addr.
121    pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
122        Self::Float64Array {
123            reason,
124            addr,
125            value,
126        }
127    }
128}
129
130/// Result of processing one array: output arrays + param updates to write back.
131pub struct ProcessResult {
132    pub output_arrays: Vec<Arc<NDArray>>,
133    pub param_updates: Vec<ParamUpdate>,
134    /// If set, only publish to the subscriber at this index (round-robin scatter).
135    pub scatter_index: Option<usize>,
136}
137
138impl ProcessResult {
139    /// Convenience: sink plugin with only param updates, no output arrays.
140    pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
141        Self {
142            output_arrays: vec![],
143            param_updates,
144            scatter_index: None,
145        }
146    }
147
148    /// Convenience: passthrough/transform plugin with output arrays but no param updates.
149    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
150        Self {
151            output_arrays,
152            param_updates: vec![],
153            scatter_index: None,
154        }
155    }
156
157    /// Convenience: no outputs, no param updates.
158    pub fn empty() -> Self {
159        Self {
160            output_arrays: vec![],
161            param_updates: vec![],
162            scatter_index: None,
163        }
164    }
165
166    /// Convenience: scatter output — send to a single subscriber by index.
167    pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
168        Self {
169            output_arrays,
170            param_updates: vec![],
171            scatter_index: Some(index),
172        }
173    }
174}
175
176/// Result of handling a control-plane param change.
177pub struct ParamChangeResult {
178    pub output_arrays: Vec<Arc<NDArray>>,
179    pub param_updates: Vec<ParamUpdate>,
180}
181
182impl ParamChangeResult {
183    pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
184        Self {
185            output_arrays: vec![],
186            param_updates,
187        }
188    }
189
190    pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
191        Self {
192            output_arrays,
193            param_updates: vec![],
194        }
195    }
196
197    pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
198        Self {
199            output_arrays,
200            param_updates,
201        }
202    }
203
204    pub fn empty() -> Self {
205        Self {
206            output_arrays: vec![],
207            param_updates: vec![],
208        }
209    }
210}
211
212/// Pure processing logic. No threading concerns.
213pub trait NDPluginProcess: Send + 'static {
214    /// Process one array. Return output arrays and param updates.
215    fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
216
217    /// Plugin type name for PLUGIN_TYPE param.
218    fn plugin_type(&self) -> &str;
219
220    /// Register plugin-specific params on the base. Called once during construction.
221    fn register_params(
222        &mut self,
223        _base: &mut PortDriverBase,
224    ) -> Result<(), asyn_rs::error::AsynError> {
225        Ok(())
226    }
227
228    /// Called when a param changes. Reason is the param index.
229    /// Return param updates to be written back to the port driver.
230    fn on_param_change(
231        &mut self,
232        _reason: usize,
233        _params: &PluginParamSnapshot,
234    ) -> ParamChangeResult {
235        ParamChangeResult::empty()
236    }
237
238    /// Return a handle to the latest NDArray data for array reads.
239    /// Override this in plugins like NDPluginStdArrays that serve pixel data
240    /// via readInt8Array/readInt16Array/etc.
241    fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
242        None
243    }
244}
245
246/// Read-only snapshot of param values available to the processing thread.
247pub struct PluginParamSnapshot {
248    pub enable_callbacks: bool,
249    /// The param reason that changed.
250    pub reason: usize,
251    /// The address (sub-device) that changed.
252    pub addr: i32,
253    /// The new value.
254    pub value: ParamChangeValue,
255}
256
257/// Sort buffer for reordering output arrays by uniqueId.
258///
259/// When sort_mode is enabled, output arrays are inserted into a BTreeMap
260/// keyed by uniqueId instead of being sent directly. A periodic flush task
261/// drains arrays in uniqueId order.
262struct SortBuffer {
263    /// Buffered arrays keyed by uniqueId, ordered by BTreeMap.
264    entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
265    /// The last uniqueId that was emitted (for detecting disordered arrays).
266    last_emitted_id: i32,
267    /// Counter of arrays received out of order (uniqueId < last_emitted_id).
268    disordered_arrays: i32,
269    /// Counter of arrays dropped because the buffer was full.
270    dropped_output_arrays: i32,
271}
272
273impl SortBuffer {
274    fn new() -> Self {
275        Self {
276            entries: BTreeMap::new(),
277            last_emitted_id: 0,
278            disordered_arrays: 0,
279            dropped_output_arrays: 0,
280        }
281    }
282
283    /// Insert arrays into the sort buffer. If buffer exceeds sort_size, drop oldest entries.
284    fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
285        if unique_id < self.last_emitted_id {
286            self.disordered_arrays += 1;
287        }
288        self.entries.entry(unique_id).or_default().extend(arrays);
289
290        // Enforce sort_size limit by dropping oldest entries
291        while sort_size > 0 && self.entries.len() as i32 > sort_size {
292            if let Some((&oldest_key, _)) = self.entries.iter().next() {
293                self.entries.remove(&oldest_key);
294                self.dropped_output_arrays += 1;
295            }
296        }
297    }
298
299    /// Drain all buffered arrays in uniqueId order. Returns them as (uniqueId, arrays) pairs.
300    fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
301        let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
302        if let Some(&(last_id, _)) = entries.last() {
303            self.last_emitted_id = last_id;
304        }
305        entries
306    }
307
308    /// Number of uniqueId entries currently buffered.
309    fn len(&self) -> i32 {
310        self.entries.len() as i32
311    }
312}
313
314/// Shared processor state protected by a mutex, accessible from both
315/// the data thread (non-blocking mode) and the caller thread (blocking mode).
316struct SharedProcessorInner<P: NDPluginProcess> {
317    processor: P,
318    output: Arc<parking_lot::Mutex<NDArrayOutput>>,
319    pool: Arc<NDArrayPool>,
320    ndarray_params: NDArrayDriverParams,
321    plugin_params: PluginBaseParams,
322    port_handle: PortHandle,
323    array_counter: i32,
324    /// Param index for STD_ARRAY_DATA (if this is a StdArrays plugin).
325    std_array_data_param: Option<usize>,
326    /// MinCallbackTime throttling: minimum seconds between process calls.
327    min_callback_time: f64,
328    /// Last time process_and_publish was called (for throttling).
329    last_process_time: Option<std::time::Instant>,
330    /// Sort mode: 0 = disabled, 1 = sorted output.
331    sort_mode: i32,
332    /// Sort time: seconds between periodic flushes of the sort buffer.
333    sort_time: f64,
334    /// Sort size: maximum number of uniqueId entries in the sort buffer.
335    sort_size: i32,
336    /// Sort buffer for reordering output arrays by uniqueId.
337    sort_buffer: SortBuffer,
338}
339
340impl<P: NDPluginProcess> SharedProcessorInner<P> {
341    fn should_throttle(&self) -> bool {
342        if self.min_callback_time <= 0.0 {
343            return false;
344        }
345        if let Some(last) = self.last_process_time {
346            last.elapsed().as_secs_f64() < self.min_callback_time
347        } else {
348            false
349        }
350    }
351
352    /// Process array and return a `ProcessOutput`. Does NOT send to actor.
353    /// Direct interrupts (std_array_data_param) happen here (sync).
354    /// The returned output must be published and flushed by the caller in async context.
355    fn process_and_publish(&mut self, array: &NDArray) -> Option<ProcessOutput> {
356        if self.should_throttle() {
357            return None;
358        }
359        let t0 = std::time::Instant::now();
360        let result = self.processor.process_array(array, &self.pool);
361        let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
362        self.last_process_time = Some(t0);
363
364        if self.sort_mode != 0 && !result.output_arrays.is_empty() {
365            let unique_id = array.unique_id;
366            self.sort_buffer
367                .insert(unique_id, result.output_arrays, self.sort_size);
368            let mut batch = self.build_sort_params_batch();
369            if !result.param_updates.is_empty() {
370                let frame_output = self.build_publish_batch(
371                    vec![],
372                    result.param_updates,
373                    result.scatter_index,
374                    Some(array),
375                    elapsed_ms,
376                );
377                batch.merge(frame_output.batch);
378            }
379            Some(ProcessOutput {
380                arrays: vec![],
381                scatter_index: None,
382                batch,
383            })
384        } else {
385            Some(self.build_publish_batch(
386                result.output_arrays,
387                result.param_updates,
388                result.scatter_index,
389                Some(array),
390                elapsed_ms,
391            ))
392        }
393    }
394
395    /// Flush the sort buffer: drain all arrays in uniqueId order.
396    fn flush_sort_buffer(&mut self) -> ProcessOutput {
397        let entries = self.sort_buffer.drain_all();
398        let mut all_arrays = Vec::new();
399        let mut combined = ParamBatch::empty();
400        for (_unique_id, arrays) in entries {
401            let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
402            all_arrays.extend(output.arrays);
403            combined.merge(output.batch);
404        }
405        combined.merge(self.build_sort_params_batch());
406        ProcessOutput {
407            arrays: all_arrays,
408            scatter_index: None,
409            batch: combined,
410        }
411    }
412
413    fn build_sort_params_batch(&self) -> ParamBatch {
414        use asyn_rs::request::ParamSetValue;
415        let sort_free = self.sort_size - self.sort_buffer.len();
416        ParamBatch {
417            addr0: vec![
418                ParamSetValue::Int32 {
419                    reason: self.plugin_params.sort_free,
420                    addr: 0,
421                    value: sort_free,
422                },
423                ParamSetValue::Int32 {
424                    reason: self.plugin_params.disordered_arrays,
425                    addr: 0,
426                    value: self.sort_buffer.disordered_arrays,
427                },
428                ParamSetValue::Int32 {
429                    reason: self.plugin_params.dropped_output_arrays,
430                    addr: 0,
431                    value: self.sort_buffer.dropped_output_arrays,
432                },
433            ],
434            extra: std::collections::HashMap::new(),
435        }
436    }
437
438    /// Build a ProcessOutput: fires direct interrupts (sync) and collects
439    /// param updates into a batch. Does NOT publish arrays — the caller
440    /// must publish them in async context.
441    fn build_publish_batch(
442        &mut self,
443        output_arrays: Vec<Arc<NDArray>>,
444        param_updates: Vec<ParamUpdate>,
445        scatter_index: Option<usize>,
446        fallback_array: Option<&NDArray>,
447        elapsed_ms: f64,
448    ) -> ProcessOutput {
449        use asyn_rs::request::ParamSetValue;
450
451        let mut addr0: Vec<ParamSetValue> = Vec::new();
452        let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
453            std::collections::HashMap::new();
454
455        if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
456            self.array_counter += 1;
457
458            // Fire array data interrupt directly (C EPICS pattern).
459            if let Some(param) = self.std_array_data_param {
460                use crate::ndarray::NDDataBuffer;
461                use asyn_rs::param::ParamValue;
462                let value = match &report_arr.data {
463                    NDDataBuffer::I8(v) => {
464                        Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
465                    }
466                    NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
467                        v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
468                    ))),
469                    NDDataBuffer::I16(v) => {
470                        Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
471                    }
472                    NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
473                        v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
474                    ))),
475                    NDDataBuffer::I32(v) => {
476                        Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
477                    }
478                    NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
479                        v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
480                    ))),
481                    NDDataBuffer::I64(v) => {
482                        Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
483                    }
484                    NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
485                        v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
486                    ))),
487                    NDDataBuffer::F32(v) => {
488                        Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
489                    }
490                    NDDataBuffer::F64(v) => {
491                        Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
492                    }
493                };
494                if let Some(value) = value {
495                    let ts = report_arr.timestamp.to_system_time();
496                    self.port_handle
497                        .interrupts()
498                        .notify(asyn_rs::interrupt::InterruptValue {
499                            reason: param,
500                            addr: 0,
501                            value,
502                            timestamp: ts,
503                            uint32_changed_mask: 0,
504                        });
505                }
506            }
507
508            let info = report_arr.info();
509            let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
510            addr0.extend([
511                ParamSetValue::Int32 {
512                    reason: self.ndarray_params.array_counter,
513                    addr: 0,
514                    value: self.array_counter,
515                },
516                ParamSetValue::Int32 {
517                    reason: self.ndarray_params.unique_id,
518                    addr: 0,
519                    value: report_arr.unique_id,
520                },
521                ParamSetValue::Int32 {
522                    reason: self.ndarray_params.n_dimensions,
523                    addr: 0,
524                    value: report_arr.dims.len() as i32,
525                },
526                ParamSetValue::Int32 {
527                    reason: self.ndarray_params.array_size_x,
528                    addr: 0,
529                    value: info.x_size as i32,
530                },
531                ParamSetValue::Int32 {
532                    reason: self.ndarray_params.array_size_y,
533                    addr: 0,
534                    value: info.y_size as i32,
535                },
536                ParamSetValue::Int32 {
537                    reason: self.ndarray_params.array_size_z,
538                    addr: 0,
539                    value: info.color_size as i32,
540                },
541                ParamSetValue::Int32 {
542                    reason: self.ndarray_params.array_size,
543                    addr: 0,
544                    value: info.total_bytes as i32,
545                },
546                ParamSetValue::Int32 {
547                    reason: self.ndarray_params.data_type,
548                    addr: 0,
549                    value: report_arr.data.data_type() as i32,
550                },
551                ParamSetValue::Int32 {
552                    reason: self.ndarray_params.color_mode,
553                    addr: 0,
554                    value: color_mode,
555                },
556                ParamSetValue::Float64 {
557                    reason: self.ndarray_params.timestamp_rbv,
558                    addr: 0,
559                    value: report_arr.timestamp.as_f64(),
560                },
561                ParamSetValue::Int32 {
562                    reason: self.ndarray_params.epics_ts_sec,
563                    addr: 0,
564                    value: report_arr.timestamp.sec as i32,
565                },
566                ParamSetValue::Int32 {
567                    reason: self.ndarray_params.epics_ts_nsec,
568                    addr: 0,
569                    value: report_arr.timestamp.nsec as i32,
570                },
571            ]);
572        }
573
574        addr0.push(ParamSetValue::Float64 {
575            reason: self.plugin_params.execution_time,
576            addr: 0,
577            value: elapsed_ms,
578        });
579
580        // ArrayRate_RBV is computed by a calc record in the DB template
581        // (SCAN "1 second", reading ArrayCounter_RBV delta), not in Rust.
582
583        // Plugin-specific param updates.
584        for update in &param_updates {
585            match update {
586                ParamUpdate::Int32 {
587                    reason,
588                    addr,
589                    value,
590                } => {
591                    let pv = ParamSetValue::Int32 {
592                        reason: *reason,
593                        addr: *addr,
594                        value: *value,
595                    };
596                    if *addr == 0 {
597                        addr0.push(pv);
598                    } else {
599                        extra.entry(*addr).or_default().push(pv);
600                    }
601                }
602                ParamUpdate::Float64 {
603                    reason,
604                    addr,
605                    value,
606                } => {
607                    let pv = ParamSetValue::Float64 {
608                        reason: *reason,
609                        addr: *addr,
610                        value: *value,
611                    };
612                    if *addr == 0 {
613                        addr0.push(pv);
614                    } else {
615                        extra.entry(*addr).or_default().push(pv);
616                    }
617                }
618                ParamUpdate::Octet {
619                    reason,
620                    addr,
621                    value,
622                } => {
623                    let pv = ParamSetValue::Octet {
624                        reason: *reason,
625                        addr: *addr,
626                        value: value.clone(),
627                    };
628                    if *addr == 0 {
629                        addr0.push(pv);
630                    } else {
631                        extra.entry(*addr).or_default().push(pv);
632                    }
633                }
634                ParamUpdate::Float64Array {
635                    reason,
636                    addr,
637                    value,
638                } => {
639                    let pv = ParamSetValue::Float64Array {
640                        reason: *reason,
641                        addr: *addr,
642                        value: value.clone(),
643                    };
644                    if *addr == 0 {
645                        addr0.push(pv);
646                    } else {
647                        extra.entry(*addr).or_default().push(pv);
648                    }
649                }
650            }
651        }
652
653        ProcessOutput {
654            arrays: output_arrays,
655            scatter_index,
656            batch: ParamBatch { addr0, extra },
657        }
658    }
659}
660
661/// Output from processing: arrays to publish + param batch to flush.
662struct ProcessOutput {
663    arrays: Vec<Arc<NDArray>>,
664    scatter_index: Option<usize>,
665    batch: ParamBatch,
666}
667
668impl ProcessOutput {
669    /// Publish arrays to downstream senders (async, concurrent fan-out).
670    ///
671    /// Each array is published to all senders concurrently (independent
672    /// backpressure per sender). Arrays are published in order — the next
673    /// array's fan-out starts only after the previous one completes.
674    async fn publish_arrays(&self, senders: &[NDArraySender]) {
675        for arr in &self.arrays {
676            if let Some(idx) = self.scatter_index {
677                if let Some(sender) = senders.get(idx % senders.len().max(1)) {
678                    sender.publish(arr.clone()).await;
679                }
680            } else {
681                let futs = senders.iter().map(|s| s.publish(arr.clone()));
682                futures_util::future::join_all(futs).await;
683            }
684        }
685    }
686}
687
688/// Collected param updates ready to be flushed to the actor.
689/// Produced by `build_publish_batch()`, consumed by async `flush()`.
690struct ParamBatch {
691    addr0: Vec<asyn_rs::request::ParamSetValue>,
692    extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
693}
694
695impl ParamBatch {
696    fn empty() -> Self {
697        Self {
698            addr0: Vec::new(),
699            extra: std::collections::HashMap::new(),
700        }
701    }
702
703    fn merge(&mut self, other: ParamBatch) {
704        self.addr0.extend(other.addr0);
705        for (addr, updates) in other.extra {
706            self.extra.entry(addr).or_default().extend(updates);
707        }
708    }
709
710    /// Flush via reliable async enqueue. Call from async context.
711    async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
712        if !self.addr0.is_empty() {
713            if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
714                eprintln!("plugin param flush error (addr 0): {e}");
715            }
716        }
717        for (addr, updates) in self.extra {
718            if let Err(e) = port.set_params_and_notify(addr, updates).await {
719                eprintln!("plugin param flush error (addr {addr}): {e}");
720            }
721        }
722    }
723}
724
725/// PortDriver implementation for a plugin's control plane.
726#[allow(dead_code)]
727pub struct PluginPortDriver {
728    base: PortDriverBase,
729    ndarray_params: NDArrayDriverParams,
730    plugin_params: PluginBaseParams,
731    param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
732    /// Optional handle to the latest NDArray for array read methods (used by StdArrays).
733    array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
734    /// Param index for STD_ARRAY_DATA (triggers I/O Intr on ArrayData waveform).
735    std_array_data_param: Option<usize>,
736}
737
738impl PluginPortDriver {
739    fn new<P: NDPluginProcess>(
740        port_name: &str,
741        plugin_type_name: &str,
742        queue_size: usize,
743        ndarray_port: &str,
744        max_addr: usize,
745        param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
746        processor: &mut P,
747        array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
748    ) -> AsynResult<Self> {
749        let mut base = PortDriverBase::new(
750            port_name,
751            max_addr,
752            PortFlags {
753                can_block: true,
754                ..Default::default()
755            },
756        );
757
758        let ndarray_params = NDArrayDriverParams::create(&mut base)?;
759        let plugin_params = PluginBaseParams::create(&mut base)?;
760
761        // Set defaults (EnableCallbacks=0: Disable by default, matching EPICS ADCore)
762        base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
763        base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
764        base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
765        base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
766        base.set_int32_param(plugin_params.queue_use, 0, 0)?;
767        base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
768        base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
769        base.set_int32_param(ndarray_params.write_file, 0, 0)?;
770        base.set_int32_param(ndarray_params.read_file, 0, 0)?;
771        base.set_int32_param(ndarray_params.capture, 0, 0)?;
772        base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
773        base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
774        base.set_string_param(ndarray_params.file_path, 0, "".into())?;
775        base.set_string_param(ndarray_params.file_name, 0, "".into())?;
776        base.set_int32_param(ndarray_params.file_number, 0, 0)?;
777        base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
778        base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
779        base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
780        base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
781        base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
782
783        // Set plugin identity params
784        base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
785        base.set_string_param(
786            ndarray_params.ad_core_version,
787            0,
788            env!("CARGO_PKG_VERSION").into(),
789        )?;
790        base.set_string_param(
791            ndarray_params.driver_version,
792            0,
793            env!("CARGO_PKG_VERSION").into(),
794        )?;
795        if !ndarray_port.is_empty() {
796            base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
797        }
798
799        // Create STD_ARRAY_DATA param for StdArrays plugins (triggers I/O Intr on ArrayData waveform)
800        let std_array_data_param = if array_data.is_some() {
801            Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
802        } else {
803            None
804        };
805
806        // Let the processor register its plugin-specific params
807        processor.register_params(&mut base)?;
808
809        Ok(Self {
810            base,
811            ndarray_params,
812            plugin_params,
813            param_change_tx,
814            array_data,
815            std_array_data_param,
816        })
817    }
818}
819
820/// Copy source slice directly into destination buffer, returning elements copied.
821fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
822    let n = src.len().min(dst.len());
823    dst[..n].copy_from_slice(&src[..n]);
824    n
825}
826
827/// Convert and copy source slice into destination buffer element-by-element.
828fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
829where
830    S: CastToF64 + Copy,
831    D: CastFromF64 + Copy,
832{
833    let n = src.len().min(dst.len());
834    for i in 0..n {
835        dst[i] = D::cast_from_f64(src[i].cast_to_f64());
836    }
837    n
838}
839
840/// Helper trait for `as f64` casts (handles lossy conversions like i64/u64).
841trait CastToF64 {
842    fn cast_to_f64(self) -> f64;
843}
844
845impl CastToF64 for i8 {
846    fn cast_to_f64(self) -> f64 {
847        self as f64
848    }
849}
850impl CastToF64 for u8 {
851    fn cast_to_f64(self) -> f64 {
852        self as f64
853    }
854}
855impl CastToF64 for i16 {
856    fn cast_to_f64(self) -> f64 {
857        self as f64
858    }
859}
860impl CastToF64 for u16 {
861    fn cast_to_f64(self) -> f64 {
862        self as f64
863    }
864}
865impl CastToF64 for i32 {
866    fn cast_to_f64(self) -> f64 {
867        self as f64
868    }
869}
870impl CastToF64 for u32 {
871    fn cast_to_f64(self) -> f64 {
872        self as f64
873    }
874}
875impl CastToF64 for i64 {
876    fn cast_to_f64(self) -> f64 {
877        self as f64
878    }
879}
880impl CastToF64 for u64 {
881    fn cast_to_f64(self) -> f64 {
882        self as f64
883    }
884}
885impl CastToF64 for f32 {
886    fn cast_to_f64(self) -> f64 {
887        self as f64
888    }
889}
890impl CastToF64 for f64 {
891    fn cast_to_f64(self) -> f64 {
892        self
893    }
894}
895
896/// Helper trait for `as` casts from f64.
897trait CastFromF64 {
898    fn cast_from_f64(v: f64) -> Self;
899}
900
901impl CastFromF64 for i8 {
902    fn cast_from_f64(v: f64) -> Self {
903        v as i8
904    }
905}
906impl CastFromF64 for i16 {
907    fn cast_from_f64(v: f64) -> Self {
908        v as i16
909    }
910}
911impl CastFromF64 for i32 {
912    fn cast_from_f64(v: f64) -> Self {
913        v as i32
914    }
915}
916impl CastFromF64 for f32 {
917    fn cast_from_f64(v: f64) -> Self {
918        v as f32
919    }
920}
921impl CastFromF64 for f64 {
922    fn cast_from_f64(v: f64) -> Self {
923        v
924    }
925}
926
927/// Copy NDArray data into the output buffer with type conversion.
928/// Returns the number of elements copied, or 0 if no data is available.
929macro_rules! impl_read_array {
930    ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
931        use crate::ndarray::NDDataBuffer;
932        let handle = match &$self.array_data {
933            Some(h) => h,
934            None => return Ok(0),
935        };
936        let guard = handle.lock();
937        let array = match &*guard {
938            Some(a) => a,
939            None => return Ok(0),
940        };
941        let n = match &array.data {
942            NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
943            $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
944        };
945        Ok(n)
946    }};
947}
948
949impl PortDriver for PluginPortDriver {
950    fn base(&self) -> &PortDriverBase {
951        &self.base
952    }
953
954    fn base_mut(&mut self) -> &mut PortDriverBase {
955        &mut self.base
956    }
957
958    fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
959        let reason = user.reason;
960        let addr = user.addr;
961        self.base.set_int32_param(reason, addr, value)?;
962        self.base.call_param_callbacks(addr)?;
963        let _ = self
964            .param_change_tx
965            .try_send((reason, addr, ParamChangeValue::Int32(value)));
966        Ok(())
967    }
968
969    fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
970        let reason = user.reason;
971        let addr = user.addr;
972        self.base.set_float64_param(reason, addr, value)?;
973        self.base.call_param_callbacks(addr)?;
974        let _ = self
975            .param_change_tx
976            .try_send((reason, addr, ParamChangeValue::Float64(value)));
977        Ok(())
978    }
979
980    fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
981        let reason = user.reason;
982        let addr = user.addr;
983        let s = String::from_utf8_lossy(data).into_owned();
984        self.base.set_string_param(reason, addr, s.clone())?;
985        self.base.call_param_callbacks(addr)?;
986        let _ = self
987            .param_change_tx
988            .try_send((reason, addr, ParamChangeValue::Octet(s)));
989        Ok(())
990    }
991
992    fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
993        impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
994    }
995
996    fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
997        impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
998    }
999
1000    fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
1001        impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
1002    }
1003
1004    fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
1005        impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
1006    }
1007
1008    fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
1009        impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
1010    }
1011}
1012
1013/// Handle to a running plugin runtime. Provides access to sender and port handle.
1014#[derive(Clone)]
1015pub struct PluginRuntimeHandle {
1016    port_runtime: PortRuntimeHandle,
1017    array_sender: NDArraySender,
1018    array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
1019    port_name: String,
1020    pub ndarray_params: NDArrayDriverParams,
1021    pub plugin_params: PluginBaseParams,
1022}
1023
1024impl PluginRuntimeHandle {
1025    pub fn port_runtime(&self) -> &PortRuntimeHandle {
1026        &self.port_runtime
1027    }
1028
1029    pub fn array_sender(&self) -> &NDArraySender {
1030        &self.array_sender
1031    }
1032
1033    pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
1034        &self.array_output
1035    }
1036
1037    pub fn port_name(&self) -> &str {
1038        &self.port_name
1039    }
1040}
1041
1042/// Create a plugin runtime with control plane (PortActor) and data plane (processing thread).
1043///
1044/// Returns:
1045/// - `PluginRuntimeHandle` for wiring and control
1046/// - `PortRuntimeHandle` for param I/O
1047/// - `JoinHandle` for the data processing thread
1048pub fn create_plugin_runtime<P: NDPluginProcess>(
1049    port_name: &str,
1050    processor: P,
1051    pool: Arc<NDArrayPool>,
1052    queue_size: usize,
1053    ndarray_port: &str,
1054    wiring: Arc<WiringRegistry>,
1055) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1056    create_plugin_runtime_multi_addr(
1057        port_name,
1058        processor,
1059        pool,
1060        queue_size,
1061        ndarray_port,
1062        wiring,
1063        1,
1064    )
1065}
1066
1067/// Create a plugin runtime with multi-addr support.
1068///
1069/// `max_addr` specifies the number of addresses (sub-devices) the port supports.
1070pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1071    port_name: &str,
1072    mut processor: P,
1073    pool: Arc<NDArrayPool>,
1074    queue_size: usize,
1075    ndarray_port: &str,
1076    wiring: Arc<WiringRegistry>,
1077    max_addr: usize,
1078) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1079    // Param change channel (control plane -> data plane)
1080    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1081
1082    // Capture plugin type and array data handle before mutable borrow
1083    let plugin_type_name = processor.plugin_type().to_string();
1084    let array_data = processor.array_data_handle();
1085
1086    // Create the port driver for control plane
1087    let driver = PluginPortDriver::new(
1088        port_name,
1089        &plugin_type_name,
1090        queue_size,
1091        ndarray_port,
1092        max_addr,
1093        param_tx,
1094        &mut processor,
1095        array_data,
1096    )
1097    .expect("failed to create plugin port driver");
1098
1099    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1100    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1101    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1102    let sort_mode_reason = driver.plugin_params.sort_mode;
1103    let sort_time_reason = driver.plugin_params.sort_time;
1104    let sort_size_reason = driver.plugin_params.sort_size;
1105    let ndarray_params = driver.ndarray_params;
1106    let plugin_params = driver.plugin_params;
1107    let std_array_data_param = driver.std_array_data_param;
1108
1109    // Create port runtime (actor thread for param I/O)
1110    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1111
1112    // Clone port handle for the data thread to write params back
1113    let port_handle = port_runtime.port_handle().clone();
1114
1115    // Array channel (data plane)
1116    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1117
1118    // Shared mode flags
1119    let enabled = Arc::new(AtomicBool::new(false));
1120    let blocking_mode = Arc::new(AtomicBool::new(false));
1121
1122    // Shared processor (accessible from data thread)
1123    let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1124    let array_output_for_handle = array_output.clone();
1125    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1126        processor,
1127        output: array_output,
1128        pool,
1129        ndarray_params,
1130        plugin_params,
1131        port_handle,
1132        array_counter: 0,
1133        std_array_data_param,
1134        min_callback_time: 0.0,
1135        last_process_time: None,
1136        sort_mode: 0,
1137        sort_time: 0.0,
1138        sort_size: 10,
1139        sort_buffer: SortBuffer::new(),
1140    }));
1141
1142    let data_enabled = enabled.clone();
1143    let data_blocking = blocking_mode.clone();
1144
1145    let mut array_sender = array_sender;
1146    array_sender.set_mode_flags(enabled, blocking_mode);
1147
1148    // Capture wiring info for data loop
1149    let nd_array_port_reason = plugin_params.nd_array_port;
1150    let sender_port_name = port_name.to_string();
1151    let initial_upstream = ndarray_port.to_string();
1152
1153    // Spawn data processing thread
1154    let data_jh = thread::Builder::new()
1155        .name(format!("plugin-data-{port_name}"))
1156        .spawn(move || {
1157            plugin_data_loop(
1158                shared,
1159                array_rx,
1160                param_rx,
1161                enable_callbacks_reason,
1162                blocking_callbacks_reason,
1163                min_callback_time_reason,
1164                sort_mode_reason,
1165                sort_time_reason,
1166                sort_size_reason,
1167                data_enabled,
1168                data_blocking,
1169                nd_array_port_reason,
1170                sender_port_name,
1171                initial_upstream,
1172                wiring,
1173            );
1174        })
1175        .expect("failed to spawn plugin data thread");
1176
1177    let handle = PluginRuntimeHandle {
1178        port_runtime,
1179        array_sender,
1180        array_output: array_output_for_handle,
1181        port_name: port_name.to_string(),
1182        ndarray_params,
1183        plugin_params,
1184    };
1185
1186    (handle, data_jh)
1187}
1188
1189fn plugin_data_loop<P: NDPluginProcess>(
1190    shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1191    mut array_rx: NDArrayReceiver,
1192    mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1193    enable_callbacks_reason: usize,
1194    blocking_callbacks_reason: usize,
1195    min_callback_time_reason: usize,
1196    sort_mode_reason: usize,
1197    sort_time_reason: usize,
1198    sort_size_reason: usize,
1199    enabled: Arc<AtomicBool>,
1200    blocking_mode: Arc<AtomicBool>,
1201    nd_array_port_reason: usize,
1202    sender_port_name: String,
1203    initial_upstream: String,
1204    wiring: Arc<WiringRegistry>,
1205) {
1206    let mut current_upstream = initial_upstream;
1207    let rt = tokio::runtime::Builder::new_current_thread()
1208        .enable_all()
1209        .build()
1210        .unwrap();
1211    rt.block_on(async {
1212        // Sort flush timer — starts disabled (very long interval).
1213        // Re-created when sort_time changes.
1214        let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1215        let mut sort_flush_active = false;
1216
1217        loop {
1218            tokio::select! {
1219                msg = array_rx.recv_msg() => {
1220                    match msg {
1221                        Some(msg) => {
1222                            // Process array and collect output (sync, under lock).
1223                            let (process_output, senders, port) = {
1224                                let mut guard = shared.lock();
1225                                let output = guard.process_and_publish(&msg.array);
1226                                let senders = guard.output.lock().senders_clone();
1227                                let port = guard.port_handle.clone();
1228                                (output, senders, port)
1229                            };
1230                            // msg dropped here → completion signaled (if tracked)
1231                            // Publish arrays and flush params outside the lock, in async context.
1232                            if let Some(po) = process_output {
1233                                po.publish_arrays(&senders).await;
1234                                po.batch.flush(&port).await;
1235                            }
1236                        }
1237                        None => break,
1238                    }
1239                }
1240                param = param_rx.recv() => {
1241                    match param {
1242                        Some((reason, addr, value)) => {
1243                            if reason == enable_callbacks_reason {
1244                                enabled.store(value.as_i32() != 0, Ordering::Release);
1245                            }
1246                            if reason == blocking_callbacks_reason {
1247                                blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1248                            }
1249                            // Handle MinCallbackTime param change
1250                            if reason == min_callback_time_reason {
1251                                shared.lock().min_callback_time = value.as_f64();
1252                            }
1253                            // Handle sort param changes
1254                            if reason == sort_mode_reason {
1255                                let mode = value.as_i32();
1256                                // Scope the guard so clippy can verify the lock
1257                                // is released before any await.
1258                                let flush_work = {
1259                                    let mut guard = shared.lock();
1260                                    guard.sort_mode = mode;
1261                                    if mode == 0 {
1262                                        let output = guard.flush_sort_buffer();
1263                                        let senders = guard.output.lock().senders_clone();
1264                                        let port = guard.port_handle.clone();
1265                                        sort_flush_active = false;
1266                                        Some((output, senders, port))
1267                                    } else {
1268                                        sort_flush_active = guard.sort_time > 0.0;
1269                                        if sort_flush_active {
1270                                            let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1271                                            sort_flush_interval = tokio::time::interval(dur);
1272                                        }
1273                                        None
1274                                    }
1275                                };
1276                                if let Some((output, senders, port)) = flush_work {
1277                                    output.publish_arrays(&senders).await;
1278                                    output.batch.flush(&port).await;
1279                                }
1280                            }
1281                            if reason == sort_time_reason {
1282                                let t = value.as_f64();
1283                                let mut guard = shared.lock();
1284                                guard.sort_time = t;
1285                                if guard.sort_mode != 0 && t > 0.0 {
1286                                    sort_flush_active = true;
1287                                    let dur = std::time::Duration::from_secs_f64(t);
1288                                    sort_flush_interval = tokio::time::interval(dur);
1289                                } else {
1290                                    sort_flush_active = false;
1291                                }
1292                                drop(guard);
1293                            }
1294                            if reason == sort_size_reason {
1295                                shared.lock().sort_size = value.as_i32();
1296                            }
1297                            // Handle NDArrayPort rewiring
1298                            if reason == nd_array_port_reason {
1299                                if let Some(new_port) = value.as_string() {
1300                                    if new_port != current_upstream {
1301                                        let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1302                                        if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1303                                            eprintln!("NDArrayPort rewire failed: {e}");
1304                                            current_upstream = old;
1305                                        }
1306                                    }
1307                                }
1308                            }
1309                            let snapshot = PluginParamSnapshot {
1310                                enable_callbacks: enabled.load(Ordering::Acquire),
1311                                reason,
1312                                addr,
1313                                value,
1314                            };
1315                            let (process_output, senders, port) = {
1316                                let mut guard = shared.lock();
1317                                let t0 = std::time::Instant::now();
1318                                let result = guard.processor.on_param_change(reason, &snapshot);
1319                                let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1320                                let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1321                                    Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
1322                                } else {
1323                                    None
1324                                };
1325                                let senders = guard.output.lock().senders_clone();
1326                                (output, senders, guard.port_handle.clone())
1327                            };
1328                            if let Some(po) = process_output {
1329                                po.publish_arrays(&senders).await;
1330                                po.batch.flush(&port).await;
1331                            }
1332                        }
1333                        None => break,
1334                    }
1335                }
1336                _ = sort_flush_interval.tick(), if sort_flush_active => {
1337                    let (output, senders, port) = {
1338                        let mut guard = shared.lock();
1339                        let output = guard.flush_sort_buffer();
1340                        let senders = guard.output.lock().senders_clone();
1341                        let port = guard.port_handle.clone();
1342                        (output, senders, port)
1343                    };
1344                    output.publish_arrays(&senders).await;
1345                    output.batch.flush(&port).await;
1346                }
1347            }
1348        }
1349    });
1350}
1351
1352/// Connect a downstream plugin's sender to a plugin runtime's output.
1353pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1354    upstream.array_output().lock().add(downstream_sender);
1355}
1356
1357/// Create a plugin runtime with a pre-wired output (for testing and direct wiring).
1358pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1359    port_name: &str,
1360    mut processor: P,
1361    pool: Arc<NDArrayPool>,
1362    queue_size: usize,
1363    output: NDArrayOutput,
1364    ndarray_port: &str,
1365    wiring: Arc<WiringRegistry>,
1366) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1367    let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1368
1369    let plugin_type_name = processor.plugin_type().to_string();
1370    let array_data = processor.array_data_handle();
1371    let driver = PluginPortDriver::new(
1372        port_name,
1373        &plugin_type_name,
1374        queue_size,
1375        ndarray_port,
1376        1,
1377        param_tx,
1378        &mut processor,
1379        array_data,
1380    )
1381    .expect("failed to create plugin port driver");
1382
1383    let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1384    let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1385    let min_callback_time_reason = driver.plugin_params.min_callback_time;
1386    let sort_mode_reason = driver.plugin_params.sort_mode;
1387    let sort_time_reason = driver.plugin_params.sort_time;
1388    let sort_size_reason = driver.plugin_params.sort_size;
1389    let ndarray_params = driver.ndarray_params;
1390    let plugin_params = driver.plugin_params;
1391    let std_array_data_param = driver.std_array_data_param;
1392
1393    let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1394
1395    let port_handle = port_runtime.port_handle().clone();
1396
1397    let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1398
1399    let enabled = Arc::new(AtomicBool::new(false));
1400    let blocking_mode = Arc::new(AtomicBool::new(false));
1401
1402    let array_output = Arc::new(parking_lot::Mutex::new(output));
1403    let array_output_for_handle = array_output.clone();
1404    let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1405        processor,
1406        output: array_output,
1407        pool,
1408        ndarray_params,
1409        plugin_params,
1410        port_handle,
1411        array_counter: 0,
1412        std_array_data_param,
1413        min_callback_time: 0.0,
1414        last_process_time: None,
1415        sort_mode: 0,
1416        sort_time: 0.0,
1417        sort_size: 10,
1418        sort_buffer: SortBuffer::new(),
1419    }));
1420
1421    let data_enabled = enabled.clone();
1422    let data_blocking = blocking_mode.clone();
1423
1424    let mut array_sender = array_sender;
1425    array_sender.set_mode_flags(enabled, blocking_mode);
1426
1427    // Capture wiring info for data loop
1428    let nd_array_port_reason = plugin_params.nd_array_port;
1429    let sender_port_name = port_name.to_string();
1430    let initial_upstream = ndarray_port.to_string();
1431
1432    let data_jh = thread::Builder::new()
1433        .name(format!("plugin-data-{port_name}"))
1434        .spawn(move || {
1435            plugin_data_loop(
1436                shared,
1437                array_rx,
1438                param_rx,
1439                enable_callbacks_reason,
1440                blocking_callbacks_reason,
1441                min_callback_time_reason,
1442                sort_mode_reason,
1443                sort_time_reason,
1444                sort_size_reason,
1445                data_enabled,
1446                data_blocking,
1447                nd_array_port_reason,
1448                sender_port_name,
1449                initial_upstream,
1450                wiring,
1451            );
1452        })
1453        .expect("failed to spawn plugin data thread");
1454
1455    let handle = PluginRuntimeHandle {
1456        port_runtime,
1457        array_sender,
1458        array_output: array_output_for_handle,
1459        port_name: port_name.to_string(),
1460        ndarray_params,
1461        plugin_params,
1462    };
1463
1464    (handle, data_jh)
1465}
1466
1467#[cfg(test)]
1468mod tests {
1469    use super::*;
1470    use crate::ndarray::{NDDataType, NDDimension};
1471    use crate::plugin::channel::ndarray_channel;
1472
1473    /// Passthrough processor: returns the input array as-is.
1474    struct PassthroughProcessor;
1475
1476    impl NDPluginProcess for PassthroughProcessor {
1477        fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1478            ProcessResult::arrays(vec![Arc::new(array.clone())])
1479        }
1480        fn plugin_type(&self) -> &str {
1481            "Passthrough"
1482        }
1483    }
1484
1485    /// Sink processor: consumes arrays, returns nothing.
1486    struct SinkProcessor {
1487        count: usize,
1488    }
1489
1490    impl NDPluginProcess for SinkProcessor {
1491        fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1492            self.count += 1;
1493            ProcessResult::empty()
1494        }
1495        fn plugin_type(&self) -> &str {
1496            "Sink"
1497        }
1498    }
1499
1500    fn make_test_array(id: i32) -> Arc<NDArray> {
1501        let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1502        arr.unique_id = id;
1503        Arc::new(arr)
1504    }
1505
1506    fn test_wiring() -> Arc<WiringRegistry> {
1507        Arc::new(WiringRegistry::new())
1508    }
1509
1510    /// Enable callbacks on a plugin handle (plugins default to disabled).
1511    fn enable_callbacks(handle: &PluginRuntimeHandle) {
1512        handle
1513            .port_runtime()
1514            .port_handle()
1515            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1516            .unwrap();
1517        std::thread::sleep(std::time::Duration::from_millis(10));
1518    }
1519
1520    /// Send an array via the sender from a sync test context.
1521    /// Uses a dedicated thread with a current-thread runtime to avoid
1522    /// interfering with the plugin's own runtime.
1523    fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
1524        let sender = sender.clone();
1525        let jh = std::thread::spawn(move || {
1526            let rt = tokio::runtime::Builder::new_current_thread()
1527                .enable_all()
1528                .build()
1529                .unwrap();
1530            rt.block_on(sender.publish(array));
1531        });
1532        jh.join().unwrap();
1533    }
1534
1535    #[test]
1536    fn test_passthrough_runtime() {
1537        let pool = Arc::new(NDArrayPool::new(1_000_000));
1538
1539        // Create downstream receiver
1540        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1541        let mut output = NDArrayOutput::new();
1542        output.add(downstream_sender);
1543
1544        let (handle, _data_jh) = create_plugin_runtime_with_output(
1545            "PASS1",
1546            PassthroughProcessor,
1547            pool,
1548            10,
1549            output,
1550            "",
1551            test_wiring(),
1552        );
1553        enable_callbacks(&handle);
1554
1555        // Send an array
1556        send_array(handle.array_sender(), make_test_array(42));
1557
1558        // Should come out the other side
1559        let received = downstream_rx.blocking_recv().unwrap();
1560        assert_eq!(received.unique_id, 42);
1561    }
1562
1563    #[test]
1564    fn test_sink_runtime() {
1565        let pool = Arc::new(NDArrayPool::new(1_000_000));
1566
1567        let (handle, _data_jh) = create_plugin_runtime(
1568            "SINK1",
1569            SinkProcessor { count: 0 },
1570            pool,
1571            10,
1572            "",
1573            test_wiring(),
1574        );
1575        enable_callbacks(&handle);
1576
1577        // Send arrays - they should be consumed silently
1578        send_array(handle.array_sender(), make_test_array(1));
1579        send_array(handle.array_sender(), make_test_array(2));
1580
1581        // Give processing thread time
1582        std::thread::sleep(std::time::Duration::from_millis(50));
1583
1584        // No crash, no output needed
1585        assert_eq!(handle.port_name(), "SINK1");
1586    }
1587
1588    #[test]
1589    fn test_plugin_type_param() {
1590        let pool = Arc::new(NDArrayPool::new(1_000_000));
1591
1592        let (handle, _data_jh) = create_plugin_runtime(
1593            "TYPE_TEST",
1594            PassthroughProcessor,
1595            pool,
1596            10,
1597            "",
1598            test_wiring(),
1599        );
1600
1601        // Verify port name
1602        assert_eq!(handle.port_name(), "TYPE_TEST");
1603        assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1604    }
1605
1606    #[test]
1607    fn test_shutdown_on_handle_drop() {
1608        let pool = Arc::new(NDArrayPool::new(1_000_000));
1609
1610        let (handle, data_jh) = create_plugin_runtime(
1611            "SHUTDOWN_TEST",
1612            PassthroughProcessor,
1613            pool,
1614            10,
1615            "",
1616            test_wiring(),
1617        );
1618
1619        // Drop the handle (closes sender channel, which should cause data thread to exit)
1620        let sender = handle.array_sender().clone();
1621        drop(handle);
1622        drop(sender);
1623
1624        // Data thread should terminate
1625        let result = data_jh.join();
1626        assert!(result.is_ok());
1627    }
1628
1629    #[test]
1630    fn test_nonblocking_passthrough() {
1631        let pool = Arc::new(NDArrayPool::new(1_000_000));
1632        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1633        let mut output = NDArrayOutput::new();
1634        output.add(downstream_sender);
1635
1636        let (handle, _data_jh) = create_plugin_runtime_with_output(
1637            "NB_TEST",
1638            PassthroughProcessor,
1639            pool,
1640            10,
1641            output,
1642            "",
1643            test_wiring(),
1644        );
1645        enable_callbacks(&handle);
1646
1647        send_array(handle.array_sender(), make_test_array(42));
1648
1649        let received = downstream_rx.blocking_recv().unwrap();
1650        assert_eq!(received.unique_id, 42);
1651    }
1652
1653    #[test]
1654    fn test_blocking_to_nonblocking_switch() {
1655        let pool = Arc::new(NDArrayPool::new(1_000_000));
1656        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1657        let mut output = NDArrayOutput::new();
1658        output.add(downstream_sender);
1659
1660        let (handle, _data_jh) = create_plugin_runtime_with_output(
1661            "SWITCH_TEST",
1662            PassthroughProcessor,
1663            pool,
1664            10,
1665            output,
1666            "",
1667            test_wiring(),
1668        );
1669        enable_callbacks(&handle);
1670
1671        // Start in blocking mode
1672        handle
1673            .port_runtime()
1674            .port_handle()
1675            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1676            .unwrap();
1677        std::thread::sleep(std::time::Duration::from_millis(50));
1678
1679        send_array(handle.array_sender(), make_test_array(1));
1680        let received = downstream_rx.blocking_recv().unwrap();
1681        assert_eq!(received.unique_id, 1);
1682
1683        // Switch back to non-blocking
1684        handle
1685            .port_runtime()
1686            .port_handle()
1687            .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1688            .unwrap();
1689        std::thread::sleep(std::time::Duration::from_millis(50));
1690
1691        // Send in non-blocking mode — goes through channel to data thread
1692        send_array(handle.array_sender(), make_test_array(2));
1693        let received = downstream_rx.blocking_recv().unwrap();
1694        assert_eq!(received.unique_id, 2);
1695    }
1696
1697    #[test]
1698    fn test_enable_callbacks_disables_processing() {
1699        let pool = Arc::new(NDArrayPool::new(1_000_000));
1700        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1701        let mut output = NDArrayOutput::new();
1702        output.add(downstream_sender);
1703
1704        let (handle, _data_jh) = create_plugin_runtime_with_output(
1705            "ENABLE_TEST",
1706            PassthroughProcessor,
1707            pool,
1708            10,
1709            output,
1710            "",
1711            test_wiring(),
1712        );
1713
1714        // Disable callbacks
1715        handle
1716            .port_runtime()
1717            .port_handle()
1718            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1719            .unwrap();
1720        std::thread::sleep(std::time::Duration::from_millis(50));
1721
1722        // Send array — should be silently dropped by sender (callbacks disabled)
1723        send_array(handle.array_sender(), make_test_array(99));
1724
1725        // Verify nothing received (with timeout)
1726        let rt = tokio::runtime::Builder::new_current_thread()
1727            .enable_all()
1728            .build()
1729            .unwrap();
1730        let result = rt.block_on(async {
1731            tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1732        });
1733        assert!(
1734            result.is_err(),
1735            "should not receive array when callbacks disabled"
1736        );
1737    }
1738
1739    #[test]
1740    fn test_downstream_receives_multiple() {
1741        let pool = Arc::new(NDArrayPool::new(1_000_000));
1742
1743        let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1744        let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1745        let mut output = NDArrayOutput::new();
1746        output.add(ds1);
1747        output.add(ds2);
1748
1749        let (handle, _data_jh) = create_plugin_runtime_with_output(
1750            "DS_TEST",
1751            PassthroughProcessor,
1752            pool,
1753            10,
1754            output,
1755            "",
1756            test_wiring(),
1757        );
1758        enable_callbacks(&handle);
1759
1760        send_array(handle.array_sender(), make_test_array(77));
1761
1762        // Both downstream receivers should have the array
1763        let r1 = rx1.blocking_recv().unwrap();
1764        let r2 = rx2.blocking_recv().unwrap();
1765        assert_eq!(r1.unique_id, 77);
1766        assert_eq!(r2.unique_id, 77);
1767    }
1768
1769    #[test]
1770    fn test_param_updates_after_send() {
1771        let pool = Arc::new(NDArrayPool::new(1_000_000));
1772
1773        struct ParamTracker;
1774        impl NDPluginProcess for ParamTracker {
1775            fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1776                ProcessResult::arrays(vec![Arc::new(array.clone())])
1777            }
1778            fn plugin_type(&self) -> &str {
1779                "ParamTracker"
1780            }
1781        }
1782
1783        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1784        let mut output = NDArrayOutput::new();
1785        output.add(downstream_sender);
1786
1787        let (handle, _data_jh) = create_plugin_runtime_with_output(
1788            "PARAM_TEST",
1789            ParamTracker,
1790            pool,
1791            10,
1792            output,
1793            "",
1794            test_wiring(),
1795        );
1796        enable_callbacks(&handle);
1797
1798        // Send array
1799        send_array(handle.array_sender(), make_test_array(1));
1800        let received = downstream_rx.blocking_recv().unwrap();
1801        assert_eq!(received.unique_id, 1);
1802
1803        // Write enable_callbacks — should not crash
1804        handle
1805            .port_runtime()
1806            .port_handle()
1807            .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1808            .unwrap();
1809        std::thread::sleep(std::time::Duration::from_millis(50));
1810
1811        // Still works after param update
1812        send_array(handle.array_sender(), make_test_array(2));
1813        let received = downstream_rx.blocking_recv().unwrap();
1814        assert_eq!(received.unique_id, 2);
1815    }
1816
1817    #[test]
1818    fn test_sort_buffer_reorders_by_unique_id() {
1819        let mut buf = SortBuffer::new();
1820
1821        // Insert out of order: 3, 1, 2
1822        buf.insert(3, vec![make_test_array(3)], 10);
1823        buf.insert(1, vec![make_test_array(1)], 10);
1824        buf.insert(2, vec![make_test_array(2)], 10);
1825
1826        assert_eq!(buf.len(), 3);
1827
1828        let drained = buf.drain_all();
1829        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1830        assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1831        assert_eq!(buf.len(), 0);
1832        assert_eq!(buf.last_emitted_id, 3);
1833    }
1834
1835    #[test]
1836    fn test_sort_buffer_detects_disordered() {
1837        let mut buf = SortBuffer::new();
1838
1839        // Emit id=5, then insert id=3 (which is less than last_emitted_id)
1840        buf.insert(5, vec![make_test_array(5)], 10);
1841        buf.drain_all(); // emits id=5, last_emitted_id=5
1842
1843        buf.insert(3, vec![make_test_array(3)], 10);
1844        assert_eq!(buf.disordered_arrays, 1);
1845    }
1846
1847    #[test]
1848    fn test_sort_buffer_drops_when_full() {
1849        let mut buf = SortBuffer::new();
1850
1851        // sort_size=2, insert 3 entries
1852        buf.insert(1, vec![make_test_array(1)], 2);
1853        buf.insert(2, vec![make_test_array(2)], 2);
1854        buf.insert(3, vec![make_test_array(3)], 2);
1855
1856        // Buffer should have 2 entries (oldest dropped)
1857        assert_eq!(buf.len(), 2);
1858        assert_eq!(buf.dropped_output_arrays, 1);
1859
1860        let drained = buf.drain_all();
1861        let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1862        assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1863    }
1864
1865    #[test]
1866    fn test_sort_mode_runtime_integration() {
1867        let pool = Arc::new(NDArrayPool::new(1_000_000));
1868        let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1869        let mut output = NDArrayOutput::new();
1870        output.add(downstream_sender);
1871
1872        let (handle, _data_jh) = create_plugin_runtime_with_output(
1873            "SORT_TEST",
1874            PassthroughProcessor,
1875            pool,
1876            10,
1877            output,
1878            "",
1879            test_wiring(),
1880        );
1881        enable_callbacks(&handle);
1882
1883        // Enable sort mode with sort_size=10
1884        handle
1885            .port_runtime()
1886            .port_handle()
1887            .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1888            .unwrap();
1889        handle
1890            .port_runtime()
1891            .port_handle()
1892            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1893            .unwrap();
1894        std::thread::sleep(std::time::Duration::from_millis(50));
1895
1896        // Send arrays out of order
1897        send_array(handle.array_sender(), make_test_array(3));
1898        send_array(handle.array_sender(), make_test_array(1));
1899        send_array(handle.array_sender(), make_test_array(2));
1900        std::thread::sleep(std::time::Duration::from_millis(100));
1901
1902        // Arrays should be buffered, not yet received
1903        let rt = tokio::runtime::Builder::new_current_thread()
1904            .enable_all()
1905            .build()
1906            .unwrap();
1907        let result = rt.block_on(async {
1908            tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1909        });
1910        assert!(
1911            result.is_err(),
1912            "arrays should be buffered while sort mode is active"
1913        );
1914
1915        // Disable sort mode — should flush all buffered arrays in order
1916        handle
1917            .port_runtime()
1918            .port_handle()
1919            .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1920            .unwrap();
1921        std::thread::sleep(std::time::Duration::from_millis(100));
1922
1923        // Receive all flushed arrays — they should arrive in sorted order
1924        let r1 = downstream_rx.blocking_recv().unwrap();
1925        let r2 = downstream_rx.blocking_recv().unwrap();
1926        let r3 = downstream_rx.blocking_recv().unwrap();
1927        assert_eq!(r1.unique_id, 1);
1928        assert_eq!(r2.unique_id, 2);
1929        assert_eq!(r3.unique_id, 3);
1930    }
1931}