1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
19use super::params::PluginBaseParams;
20use super::wiring::WiringRegistry;
21
22#[derive(Debug, Clone)]
24pub enum ParamChangeValue {
25 Int32(i32),
26 Float64(f64),
27 Octet(String),
28}
29
30impl ParamChangeValue {
31 pub fn as_i32(&self) -> i32 {
32 match self {
33 ParamChangeValue::Int32(v) => *v,
34 ParamChangeValue::Float64(v) => *v as i32,
35 ParamChangeValue::Octet(_) => 0,
36 }
37 }
38
39 pub fn as_f64(&self) -> f64 {
40 match self {
41 ParamChangeValue::Int32(v) => *v as f64,
42 ParamChangeValue::Float64(v) => *v,
43 ParamChangeValue::Octet(_) => 0.0,
44 }
45 }
46
47 pub fn as_string(&self) -> Option<&str> {
48 match self {
49 ParamChangeValue::Octet(s) => Some(s),
50 _ => None,
51 }
52 }
53}
54
55pub enum ParamUpdate {
57 Int32 {
58 reason: usize,
59 addr: i32,
60 value: i32,
61 },
62 Float64 {
63 reason: usize,
64 addr: i32,
65 value: f64,
66 },
67 Octet {
68 reason: usize,
69 addr: i32,
70 value: String,
71 },
72 Float64Array {
73 reason: usize,
74 addr: i32,
75 value: Vec<f64>,
76 },
77}
78
79impl ParamUpdate {
80 pub fn int32(reason: usize, value: i32) -> Self {
82 Self::Int32 {
83 reason,
84 addr: 0,
85 value,
86 }
87 }
88 pub fn float64(reason: usize, value: f64) -> Self {
90 Self::Float64 {
91 reason,
92 addr: 0,
93 value,
94 }
95 }
96 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
98 Self::Int32 {
99 reason,
100 addr,
101 value,
102 }
103 }
104 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
106 Self::Float64 {
107 reason,
108 addr,
109 value,
110 }
111 }
112 pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
114 Self::Float64Array {
115 reason,
116 addr: 0,
117 value,
118 }
119 }
120 pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
122 Self::Float64Array {
123 reason,
124 addr,
125 value,
126 }
127 }
128}
129
130pub struct ProcessResult {
132 pub output_arrays: Vec<Arc<NDArray>>,
133 pub param_updates: Vec<ParamUpdate>,
134 pub scatter_index: Option<usize>,
136}
137
138impl ProcessResult {
139 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
141 Self {
142 output_arrays: vec![],
143 param_updates,
144 scatter_index: None,
145 }
146 }
147
148 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
150 Self {
151 output_arrays,
152 param_updates: vec![],
153 scatter_index: None,
154 }
155 }
156
157 pub fn empty() -> Self {
159 Self {
160 output_arrays: vec![],
161 param_updates: vec![],
162 scatter_index: None,
163 }
164 }
165
166 pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
168 Self {
169 output_arrays,
170 param_updates: vec![],
171 scatter_index: Some(index),
172 }
173 }
174}
175
176pub struct ParamChangeResult {
178 pub output_arrays: Vec<Arc<NDArray>>,
179 pub param_updates: Vec<ParamUpdate>,
180}
181
182impl ParamChangeResult {
183 pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
184 Self {
185 output_arrays: vec![],
186 param_updates,
187 }
188 }
189
190 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
191 Self {
192 output_arrays,
193 param_updates: vec![],
194 }
195 }
196
197 pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
198 Self {
199 output_arrays,
200 param_updates,
201 }
202 }
203
204 pub fn empty() -> Self {
205 Self {
206 output_arrays: vec![],
207 param_updates: vec![],
208 }
209 }
210}
211
212pub trait NDPluginProcess: Send + 'static {
214 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
216
217 fn plugin_type(&self) -> &str;
219
220 fn register_params(
222 &mut self,
223 _base: &mut PortDriverBase,
224 ) -> Result<(), asyn_rs::error::AsynError> {
225 Ok(())
226 }
227
228 fn on_param_change(
231 &mut self,
232 _reason: usize,
233 _params: &PluginParamSnapshot,
234 ) -> ParamChangeResult {
235 ParamChangeResult::empty()
236 }
237
238 fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
242 None
243 }
244}
245
246pub struct PluginParamSnapshot {
248 pub enable_callbacks: bool,
249 pub reason: usize,
251 pub addr: i32,
253 pub value: ParamChangeValue,
255}
256
257struct SortBuffer {
263 entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
265 last_emitted_id: i32,
267 disordered_arrays: i32,
269 dropped_output_arrays: i32,
271}
272
273impl SortBuffer {
274 fn new() -> Self {
275 Self {
276 entries: BTreeMap::new(),
277 last_emitted_id: 0,
278 disordered_arrays: 0,
279 dropped_output_arrays: 0,
280 }
281 }
282
283 fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
285 if unique_id < self.last_emitted_id {
286 self.disordered_arrays += 1;
287 }
288 self.entries.entry(unique_id).or_default().extend(arrays);
289
290 while sort_size > 0 && self.entries.len() as i32 > sort_size {
292 if let Some((&oldest_key, _)) = self.entries.iter().next() {
293 self.entries.remove(&oldest_key);
294 self.dropped_output_arrays += 1;
295 }
296 }
297 }
298
299 fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
301 let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
302 if let Some(&(last_id, _)) = entries.last() {
303 self.last_emitted_id = last_id;
304 }
305 entries
306 }
307
308 fn len(&self) -> i32 {
310 self.entries.len() as i32
311 }
312}
313
314struct SharedProcessorInner<P: NDPluginProcess> {
317 processor: P,
318 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
319 pool: Arc<NDArrayPool>,
320 ndarray_params: NDArrayDriverParams,
321 plugin_params: PluginBaseParams,
322 port_handle: PortHandle,
323 array_counter: i32,
324 std_array_data_param: Option<usize>,
326 min_callback_time: f64,
328 last_process_time: Option<std::time::Instant>,
330 sort_mode: i32,
332 sort_time: f64,
334 sort_size: i32,
336 sort_buffer: SortBuffer,
338}
339
340impl<P: NDPluginProcess> SharedProcessorInner<P> {
341 fn should_throttle(&self) -> bool {
342 if self.min_callback_time <= 0.0 {
343 return false;
344 }
345 if let Some(last) = self.last_process_time {
346 last.elapsed().as_secs_f64() < self.min_callback_time
347 } else {
348 false
349 }
350 }
351
352 fn process_and_publish(&mut self, array: &NDArray) -> Option<ProcessOutput> {
356 if self.should_throttle() {
357 return None;
358 }
359 let t0 = std::time::Instant::now();
360 let result = self.processor.process_array(array, &self.pool);
361 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
362 self.last_process_time = Some(t0);
363
364 if self.sort_mode != 0 && !result.output_arrays.is_empty() {
365 let unique_id = array.unique_id;
366 self.sort_buffer
367 .insert(unique_id, result.output_arrays, self.sort_size);
368 let mut batch = self.build_sort_params_batch();
369 if !result.param_updates.is_empty() {
370 let frame_output = self.build_publish_batch(
371 vec![],
372 result.param_updates,
373 result.scatter_index,
374 Some(array),
375 elapsed_ms,
376 );
377 batch.merge(frame_output.batch);
378 }
379 Some(ProcessOutput {
380 arrays: vec![],
381 scatter_index: None,
382 batch,
383 })
384 } else {
385 Some(self.build_publish_batch(
386 result.output_arrays,
387 result.param_updates,
388 result.scatter_index,
389 Some(array),
390 elapsed_ms,
391 ))
392 }
393 }
394
395 fn flush_sort_buffer(&mut self) -> ProcessOutput {
397 let entries = self.sort_buffer.drain_all();
398 let mut all_arrays = Vec::new();
399 let mut combined = ParamBatch::empty();
400 for (_unique_id, arrays) in entries {
401 let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
402 all_arrays.extend(output.arrays);
403 combined.merge(output.batch);
404 }
405 combined.merge(self.build_sort_params_batch());
406 ProcessOutput {
407 arrays: all_arrays,
408 scatter_index: None,
409 batch: combined,
410 }
411 }
412
413 fn build_sort_params_batch(&self) -> ParamBatch {
414 use asyn_rs::request::ParamSetValue;
415 let sort_free = self.sort_size - self.sort_buffer.len();
416 ParamBatch {
417 addr0: vec![
418 ParamSetValue::Int32 {
419 reason: self.plugin_params.sort_free,
420 addr: 0,
421 value: sort_free,
422 },
423 ParamSetValue::Int32 {
424 reason: self.plugin_params.disordered_arrays,
425 addr: 0,
426 value: self.sort_buffer.disordered_arrays,
427 },
428 ParamSetValue::Int32 {
429 reason: self.plugin_params.dropped_output_arrays,
430 addr: 0,
431 value: self.sort_buffer.dropped_output_arrays,
432 },
433 ],
434 extra: std::collections::HashMap::new(),
435 }
436 }
437
438 fn build_publish_batch(
442 &mut self,
443 output_arrays: Vec<Arc<NDArray>>,
444 param_updates: Vec<ParamUpdate>,
445 scatter_index: Option<usize>,
446 fallback_array: Option<&NDArray>,
447 elapsed_ms: f64,
448 ) -> ProcessOutput {
449 use asyn_rs::request::ParamSetValue;
450
451 let mut addr0: Vec<ParamSetValue> = Vec::new();
452 let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
453 std::collections::HashMap::new();
454
455 if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
456 self.array_counter += 1;
457
458 if let Some(param) = self.std_array_data_param {
460 use crate::ndarray::NDDataBuffer;
461 use asyn_rs::param::ParamValue;
462 let value = match &report_arr.data {
463 NDDataBuffer::I8(v) => {
464 Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
465 }
466 NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
467 v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
468 ))),
469 NDDataBuffer::I16(v) => {
470 Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
471 }
472 NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
473 v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
474 ))),
475 NDDataBuffer::I32(v) => {
476 Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
477 }
478 NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
479 v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
480 ))),
481 NDDataBuffer::I64(v) => {
482 Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
483 }
484 NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
485 v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
486 ))),
487 NDDataBuffer::F32(v) => {
488 Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
489 }
490 NDDataBuffer::F64(v) => {
491 Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
492 }
493 };
494 if let Some(value) = value {
495 let ts = report_arr.timestamp.to_system_time();
496 self.port_handle
497 .interrupts()
498 .notify(asyn_rs::interrupt::InterruptValue {
499 reason: param,
500 addr: 0,
501 value,
502 timestamp: ts,
503 uint32_changed_mask: 0,
504 });
505 }
506 }
507
508 let info = report_arr.info();
509 let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
510 addr0.extend([
511 ParamSetValue::Int32 {
512 reason: self.ndarray_params.array_counter,
513 addr: 0,
514 value: self.array_counter,
515 },
516 ParamSetValue::Int32 {
517 reason: self.ndarray_params.unique_id,
518 addr: 0,
519 value: report_arr.unique_id,
520 },
521 ParamSetValue::Int32 {
522 reason: self.ndarray_params.n_dimensions,
523 addr: 0,
524 value: report_arr.dims.len() as i32,
525 },
526 ParamSetValue::Int32 {
527 reason: self.ndarray_params.array_size_x,
528 addr: 0,
529 value: info.x_size as i32,
530 },
531 ParamSetValue::Int32 {
532 reason: self.ndarray_params.array_size_y,
533 addr: 0,
534 value: info.y_size as i32,
535 },
536 ParamSetValue::Int32 {
537 reason: self.ndarray_params.array_size_z,
538 addr: 0,
539 value: info.color_size as i32,
540 },
541 ParamSetValue::Int32 {
542 reason: self.ndarray_params.array_size,
543 addr: 0,
544 value: info.total_bytes as i32,
545 },
546 ParamSetValue::Int32 {
547 reason: self.ndarray_params.data_type,
548 addr: 0,
549 value: report_arr.data.data_type() as i32,
550 },
551 ParamSetValue::Int32 {
552 reason: self.ndarray_params.color_mode,
553 addr: 0,
554 value: color_mode,
555 },
556 ParamSetValue::Float64 {
557 reason: self.ndarray_params.timestamp_rbv,
558 addr: 0,
559 value: report_arr.timestamp.as_f64(),
560 },
561 ParamSetValue::Int32 {
562 reason: self.ndarray_params.epics_ts_sec,
563 addr: 0,
564 value: report_arr.timestamp.sec as i32,
565 },
566 ParamSetValue::Int32 {
567 reason: self.ndarray_params.epics_ts_nsec,
568 addr: 0,
569 value: report_arr.timestamp.nsec as i32,
570 },
571 ]);
572 }
573
574 addr0.push(ParamSetValue::Float64 {
575 reason: self.plugin_params.execution_time,
576 addr: 0,
577 value: elapsed_ms,
578 });
579
580 for update in ¶m_updates {
585 match update {
586 ParamUpdate::Int32 {
587 reason,
588 addr,
589 value,
590 } => {
591 let pv = ParamSetValue::Int32 {
592 reason: *reason,
593 addr: *addr,
594 value: *value,
595 };
596 if *addr == 0 {
597 addr0.push(pv);
598 } else {
599 extra.entry(*addr).or_default().push(pv);
600 }
601 }
602 ParamUpdate::Float64 {
603 reason,
604 addr,
605 value,
606 } => {
607 let pv = ParamSetValue::Float64 {
608 reason: *reason,
609 addr: *addr,
610 value: *value,
611 };
612 if *addr == 0 {
613 addr0.push(pv);
614 } else {
615 extra.entry(*addr).or_default().push(pv);
616 }
617 }
618 ParamUpdate::Octet {
619 reason,
620 addr,
621 value,
622 } => {
623 let pv = ParamSetValue::Octet {
624 reason: *reason,
625 addr: *addr,
626 value: value.clone(),
627 };
628 if *addr == 0 {
629 addr0.push(pv);
630 } else {
631 extra.entry(*addr).or_default().push(pv);
632 }
633 }
634 ParamUpdate::Float64Array {
635 reason,
636 addr,
637 value,
638 } => {
639 let pv = ParamSetValue::Float64Array {
640 reason: *reason,
641 addr: *addr,
642 value: value.clone(),
643 };
644 if *addr == 0 {
645 addr0.push(pv);
646 } else {
647 extra.entry(*addr).or_default().push(pv);
648 }
649 }
650 }
651 }
652
653 ProcessOutput {
654 arrays: output_arrays,
655 scatter_index,
656 batch: ParamBatch { addr0, extra },
657 }
658 }
659}
660
661struct ProcessOutput {
663 arrays: Vec<Arc<NDArray>>,
664 scatter_index: Option<usize>,
665 batch: ParamBatch,
666}
667
668impl ProcessOutput {
669 async fn publish_arrays(&self, senders: &[NDArraySender]) {
675 for arr in &self.arrays {
676 if let Some(idx) = self.scatter_index {
677 if let Some(sender) = senders.get(idx % senders.len().max(1)) {
678 sender.publish(arr.clone()).await;
679 }
680 } else {
681 let futs = senders.iter().map(|s| s.publish(arr.clone()));
682 futures_util::future::join_all(futs).await;
683 }
684 }
685 }
686}
687
688struct ParamBatch {
691 addr0: Vec<asyn_rs::request::ParamSetValue>,
692 extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
693}
694
695impl ParamBatch {
696 fn empty() -> Self {
697 Self {
698 addr0: Vec::new(),
699 extra: std::collections::HashMap::new(),
700 }
701 }
702
703 fn merge(&mut self, other: ParamBatch) {
704 self.addr0.extend(other.addr0);
705 for (addr, updates) in other.extra {
706 self.extra.entry(addr).or_default().extend(updates);
707 }
708 }
709
710 async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
712 if !self.addr0.is_empty() {
713 if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
714 eprintln!("plugin param flush error (addr 0): {e}");
715 }
716 }
717 for (addr, updates) in self.extra {
718 if let Err(e) = port.set_params_and_notify(addr, updates).await {
719 eprintln!("plugin param flush error (addr {addr}): {e}");
720 }
721 }
722 }
723}
724
725#[allow(dead_code)]
727pub struct PluginPortDriver {
728 base: PortDriverBase,
729 ndarray_params: NDArrayDriverParams,
730 plugin_params: PluginBaseParams,
731 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
732 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
734 std_array_data_param: Option<usize>,
736}
737
738impl PluginPortDriver {
739 fn new<P: NDPluginProcess>(
740 port_name: &str,
741 plugin_type_name: &str,
742 queue_size: usize,
743 ndarray_port: &str,
744 max_addr: usize,
745 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
746 processor: &mut P,
747 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
748 ) -> AsynResult<Self> {
749 let mut base = PortDriverBase::new(
750 port_name,
751 max_addr,
752 PortFlags {
753 can_block: true,
754 ..Default::default()
755 },
756 );
757
758 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
759 let plugin_params = PluginBaseParams::create(&mut base)?;
760
761 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
763 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
764 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
765 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
766 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
767 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
768 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
769 base.set_int32_param(ndarray_params.write_file, 0, 0)?;
770 base.set_int32_param(ndarray_params.read_file, 0, 0)?;
771 base.set_int32_param(ndarray_params.capture, 0, 0)?;
772 base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
773 base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
774 base.set_string_param(ndarray_params.file_path, 0, "".into())?;
775 base.set_string_param(ndarray_params.file_name, 0, "".into())?;
776 base.set_int32_param(ndarray_params.file_number, 0, 0)?;
777 base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
778 base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
779 base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
780 base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
781 base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
782
783 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
785 base.set_string_param(
786 ndarray_params.ad_core_version,
787 0,
788 env!("CARGO_PKG_VERSION").into(),
789 )?;
790 base.set_string_param(
791 ndarray_params.driver_version,
792 0,
793 env!("CARGO_PKG_VERSION").into(),
794 )?;
795 if !ndarray_port.is_empty() {
796 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
797 }
798
799 let std_array_data_param = if array_data.is_some() {
801 Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
802 } else {
803 None
804 };
805
806 processor.register_params(&mut base)?;
808
809 Ok(Self {
810 base,
811 ndarray_params,
812 plugin_params,
813 param_change_tx,
814 array_data,
815 std_array_data_param,
816 })
817 }
818}
819
820fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
822 let n = src.len().min(dst.len());
823 dst[..n].copy_from_slice(&src[..n]);
824 n
825}
826
827fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
829where
830 S: CastToF64 + Copy,
831 D: CastFromF64 + Copy,
832{
833 let n = src.len().min(dst.len());
834 for i in 0..n {
835 dst[i] = D::cast_from_f64(src[i].cast_to_f64());
836 }
837 n
838}
839
840trait CastToF64 {
842 fn cast_to_f64(self) -> f64;
843}
844
845impl CastToF64 for i8 {
846 fn cast_to_f64(self) -> f64 {
847 self as f64
848 }
849}
850impl CastToF64 for u8 {
851 fn cast_to_f64(self) -> f64 {
852 self as f64
853 }
854}
855impl CastToF64 for i16 {
856 fn cast_to_f64(self) -> f64 {
857 self as f64
858 }
859}
860impl CastToF64 for u16 {
861 fn cast_to_f64(self) -> f64 {
862 self as f64
863 }
864}
865impl CastToF64 for i32 {
866 fn cast_to_f64(self) -> f64 {
867 self as f64
868 }
869}
870impl CastToF64 for u32 {
871 fn cast_to_f64(self) -> f64 {
872 self as f64
873 }
874}
875impl CastToF64 for i64 {
876 fn cast_to_f64(self) -> f64 {
877 self as f64
878 }
879}
880impl CastToF64 for u64 {
881 fn cast_to_f64(self) -> f64 {
882 self as f64
883 }
884}
885impl CastToF64 for f32 {
886 fn cast_to_f64(self) -> f64 {
887 self as f64
888 }
889}
890impl CastToF64 for f64 {
891 fn cast_to_f64(self) -> f64 {
892 self
893 }
894}
895
896trait CastFromF64 {
898 fn cast_from_f64(v: f64) -> Self;
899}
900
901impl CastFromF64 for i8 {
902 fn cast_from_f64(v: f64) -> Self {
903 v as i8
904 }
905}
906impl CastFromF64 for i16 {
907 fn cast_from_f64(v: f64) -> Self {
908 v as i16
909 }
910}
911impl CastFromF64 for i32 {
912 fn cast_from_f64(v: f64) -> Self {
913 v as i32
914 }
915}
916impl CastFromF64 for f32 {
917 fn cast_from_f64(v: f64) -> Self {
918 v as f32
919 }
920}
921impl CastFromF64 for f64 {
922 fn cast_from_f64(v: f64) -> Self {
923 v
924 }
925}
926
927macro_rules! impl_read_array {
930 ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
931 use crate::ndarray::NDDataBuffer;
932 let handle = match &$self.array_data {
933 Some(h) => h,
934 None => return Ok(0),
935 };
936 let guard = handle.lock();
937 let array = match &*guard {
938 Some(a) => a,
939 None => return Ok(0),
940 };
941 let n = match &array.data {
942 NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
943 $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
944 };
945 Ok(n)
946 }};
947}
948
949impl PortDriver for PluginPortDriver {
950 fn base(&self) -> &PortDriverBase {
951 &self.base
952 }
953
954 fn base_mut(&mut self) -> &mut PortDriverBase {
955 &mut self.base
956 }
957
958 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
959 let reason = user.reason;
960 let addr = user.addr;
961 self.base.set_int32_param(reason, addr, value)?;
962 self.base.call_param_callbacks(addr)?;
963 let _ = self
964 .param_change_tx
965 .try_send((reason, addr, ParamChangeValue::Int32(value)));
966 Ok(())
967 }
968
969 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
970 let reason = user.reason;
971 let addr = user.addr;
972 self.base.set_float64_param(reason, addr, value)?;
973 self.base.call_param_callbacks(addr)?;
974 let _ = self
975 .param_change_tx
976 .try_send((reason, addr, ParamChangeValue::Float64(value)));
977 Ok(())
978 }
979
980 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
981 let reason = user.reason;
982 let addr = user.addr;
983 let s = String::from_utf8_lossy(data).into_owned();
984 self.base.set_string_param(reason, addr, s.clone())?;
985 self.base.call_param_callbacks(addr)?;
986 let _ = self
987 .param_change_tx
988 .try_send((reason, addr, ParamChangeValue::Octet(s)));
989 Ok(())
990 }
991
992 fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
993 impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
994 }
995
996 fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
997 impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
998 }
999
1000 fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
1001 impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
1002 }
1003
1004 fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
1005 impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
1006 }
1007
1008 fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
1009 impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
1010 }
1011}
1012
1013#[derive(Clone)]
1015pub struct PluginRuntimeHandle {
1016 port_runtime: PortRuntimeHandle,
1017 array_sender: NDArraySender,
1018 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
1019 port_name: String,
1020 pub ndarray_params: NDArrayDriverParams,
1021 pub plugin_params: PluginBaseParams,
1022}
1023
1024impl PluginRuntimeHandle {
1025 pub fn port_runtime(&self) -> &PortRuntimeHandle {
1026 &self.port_runtime
1027 }
1028
1029 pub fn array_sender(&self) -> &NDArraySender {
1030 &self.array_sender
1031 }
1032
1033 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
1034 &self.array_output
1035 }
1036
1037 pub fn port_name(&self) -> &str {
1038 &self.port_name
1039 }
1040}
1041
1042pub fn create_plugin_runtime<P: NDPluginProcess>(
1049 port_name: &str,
1050 processor: P,
1051 pool: Arc<NDArrayPool>,
1052 queue_size: usize,
1053 ndarray_port: &str,
1054 wiring: Arc<WiringRegistry>,
1055) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1056 create_plugin_runtime_multi_addr(
1057 port_name,
1058 processor,
1059 pool,
1060 queue_size,
1061 ndarray_port,
1062 wiring,
1063 1,
1064 )
1065}
1066
1067pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1071 port_name: &str,
1072 mut processor: P,
1073 pool: Arc<NDArrayPool>,
1074 queue_size: usize,
1075 ndarray_port: &str,
1076 wiring: Arc<WiringRegistry>,
1077 max_addr: usize,
1078) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1079 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1081
1082 let plugin_type_name = processor.plugin_type().to_string();
1084 let array_data = processor.array_data_handle();
1085
1086 let driver = PluginPortDriver::new(
1088 port_name,
1089 &plugin_type_name,
1090 queue_size,
1091 ndarray_port,
1092 max_addr,
1093 param_tx,
1094 &mut processor,
1095 array_data,
1096 )
1097 .expect("failed to create plugin port driver");
1098
1099 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1100 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1101 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1102 let sort_mode_reason = driver.plugin_params.sort_mode;
1103 let sort_time_reason = driver.plugin_params.sort_time;
1104 let sort_size_reason = driver.plugin_params.sort_size;
1105 let ndarray_params = driver.ndarray_params;
1106 let plugin_params = driver.plugin_params;
1107 let std_array_data_param = driver.std_array_data_param;
1108
1109 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1111
1112 let port_handle = port_runtime.port_handle().clone();
1114
1115 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1117
1118 let enabled = Arc::new(AtomicBool::new(false));
1120 let blocking_mode = Arc::new(AtomicBool::new(false));
1121
1122 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1124 let array_output_for_handle = array_output.clone();
1125 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1126 processor,
1127 output: array_output,
1128 pool,
1129 ndarray_params,
1130 plugin_params,
1131 port_handle,
1132 array_counter: 0,
1133 std_array_data_param,
1134 min_callback_time: 0.0,
1135 last_process_time: None,
1136 sort_mode: 0,
1137 sort_time: 0.0,
1138 sort_size: 10,
1139 sort_buffer: SortBuffer::new(),
1140 }));
1141
1142 let data_enabled = enabled.clone();
1143 let data_blocking = blocking_mode.clone();
1144
1145 let mut array_sender = array_sender;
1146 array_sender.set_mode_flags(enabled, blocking_mode);
1147
1148 let nd_array_port_reason = plugin_params.nd_array_port;
1150 let sender_port_name = port_name.to_string();
1151 let initial_upstream = ndarray_port.to_string();
1152
1153 let data_jh = thread::Builder::new()
1155 .name(format!("plugin-data-{port_name}"))
1156 .spawn(move || {
1157 plugin_data_loop(
1158 shared,
1159 array_rx,
1160 param_rx,
1161 enable_callbacks_reason,
1162 blocking_callbacks_reason,
1163 min_callback_time_reason,
1164 sort_mode_reason,
1165 sort_time_reason,
1166 sort_size_reason,
1167 data_enabled,
1168 data_blocking,
1169 nd_array_port_reason,
1170 sender_port_name,
1171 initial_upstream,
1172 wiring,
1173 );
1174 })
1175 .expect("failed to spawn plugin data thread");
1176
1177 let handle = PluginRuntimeHandle {
1178 port_runtime,
1179 array_sender,
1180 array_output: array_output_for_handle,
1181 port_name: port_name.to_string(),
1182 ndarray_params,
1183 plugin_params,
1184 };
1185
1186 (handle, data_jh)
1187}
1188
1189fn plugin_data_loop<P: NDPluginProcess>(
1190 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1191 mut array_rx: NDArrayReceiver,
1192 mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1193 enable_callbacks_reason: usize,
1194 blocking_callbacks_reason: usize,
1195 min_callback_time_reason: usize,
1196 sort_mode_reason: usize,
1197 sort_time_reason: usize,
1198 sort_size_reason: usize,
1199 enabled: Arc<AtomicBool>,
1200 blocking_mode: Arc<AtomicBool>,
1201 nd_array_port_reason: usize,
1202 sender_port_name: String,
1203 initial_upstream: String,
1204 wiring: Arc<WiringRegistry>,
1205) {
1206 let mut current_upstream = initial_upstream;
1207 let rt = tokio::runtime::Builder::new_current_thread()
1208 .enable_all()
1209 .build()
1210 .unwrap();
1211 rt.block_on(async {
1212 let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1215 let mut sort_flush_active = false;
1216
1217 loop {
1218 tokio::select! {
1219 msg = array_rx.recv_msg() => {
1220 match msg {
1221 Some(msg) => {
1222 let (process_output, senders, port) = {
1224 let mut guard = shared.lock();
1225 let output = guard.process_and_publish(&msg.array);
1226 let senders = guard.output.lock().senders_clone();
1227 let port = guard.port_handle.clone();
1228 (output, senders, port)
1229 };
1230 if let Some(po) = process_output {
1233 po.publish_arrays(&senders).await;
1234 po.batch.flush(&port).await;
1235 }
1236 }
1237 None => break,
1238 }
1239 }
1240 param = param_rx.recv() => {
1241 match param {
1242 Some((reason, addr, value)) => {
1243 if reason == enable_callbacks_reason {
1244 enabled.store(value.as_i32() != 0, Ordering::Release);
1245 }
1246 if reason == blocking_callbacks_reason {
1247 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1248 }
1249 if reason == min_callback_time_reason {
1251 shared.lock().min_callback_time = value.as_f64();
1252 }
1253 if reason == sort_mode_reason {
1255 let mode = value.as_i32();
1256 let flush_work = {
1259 let mut guard = shared.lock();
1260 guard.sort_mode = mode;
1261 if mode == 0 {
1262 let output = guard.flush_sort_buffer();
1263 let senders = guard.output.lock().senders_clone();
1264 let port = guard.port_handle.clone();
1265 sort_flush_active = false;
1266 Some((output, senders, port))
1267 } else {
1268 sort_flush_active = guard.sort_time > 0.0;
1269 if sort_flush_active {
1270 let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1271 sort_flush_interval = tokio::time::interval(dur);
1272 }
1273 None
1274 }
1275 };
1276 if let Some((output, senders, port)) = flush_work {
1277 output.publish_arrays(&senders).await;
1278 output.batch.flush(&port).await;
1279 }
1280 }
1281 if reason == sort_time_reason {
1282 let t = value.as_f64();
1283 let mut guard = shared.lock();
1284 guard.sort_time = t;
1285 if guard.sort_mode != 0 && t > 0.0 {
1286 sort_flush_active = true;
1287 let dur = std::time::Duration::from_secs_f64(t);
1288 sort_flush_interval = tokio::time::interval(dur);
1289 } else {
1290 sort_flush_active = false;
1291 }
1292 drop(guard);
1293 }
1294 if reason == sort_size_reason {
1295 shared.lock().sort_size = value.as_i32();
1296 }
1297 if reason == nd_array_port_reason {
1299 if let Some(new_port) = value.as_string() {
1300 if new_port != current_upstream {
1301 let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1302 if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1303 eprintln!("NDArrayPort rewire failed: {e}");
1304 current_upstream = old;
1305 }
1306 }
1307 }
1308 }
1309 let snapshot = PluginParamSnapshot {
1310 enable_callbacks: enabled.load(Ordering::Acquire),
1311 reason,
1312 addr,
1313 value,
1314 };
1315 let (process_output, senders, port) = {
1316 let mut guard = shared.lock();
1317 let t0 = std::time::Instant::now();
1318 let result = guard.processor.on_param_change(reason, &snapshot);
1319 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1320 let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1321 Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
1322 } else {
1323 None
1324 };
1325 let senders = guard.output.lock().senders_clone();
1326 (output, senders, guard.port_handle.clone())
1327 };
1328 if let Some(po) = process_output {
1329 po.publish_arrays(&senders).await;
1330 po.batch.flush(&port).await;
1331 }
1332 }
1333 None => break,
1334 }
1335 }
1336 _ = sort_flush_interval.tick(), if sort_flush_active => {
1337 let (output, senders, port) = {
1338 let mut guard = shared.lock();
1339 let output = guard.flush_sort_buffer();
1340 let senders = guard.output.lock().senders_clone();
1341 let port = guard.port_handle.clone();
1342 (output, senders, port)
1343 };
1344 output.publish_arrays(&senders).await;
1345 output.batch.flush(&port).await;
1346 }
1347 }
1348 }
1349 });
1350}
1351
1352pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1354 upstream.array_output().lock().add(downstream_sender);
1355}
1356
1357pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1359 port_name: &str,
1360 mut processor: P,
1361 pool: Arc<NDArrayPool>,
1362 queue_size: usize,
1363 output: NDArrayOutput,
1364 ndarray_port: &str,
1365 wiring: Arc<WiringRegistry>,
1366) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1367 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1368
1369 let plugin_type_name = processor.plugin_type().to_string();
1370 let array_data = processor.array_data_handle();
1371 let driver = PluginPortDriver::new(
1372 port_name,
1373 &plugin_type_name,
1374 queue_size,
1375 ndarray_port,
1376 1,
1377 param_tx,
1378 &mut processor,
1379 array_data,
1380 )
1381 .expect("failed to create plugin port driver");
1382
1383 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1384 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1385 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1386 let sort_mode_reason = driver.plugin_params.sort_mode;
1387 let sort_time_reason = driver.plugin_params.sort_time;
1388 let sort_size_reason = driver.plugin_params.sort_size;
1389 let ndarray_params = driver.ndarray_params;
1390 let plugin_params = driver.plugin_params;
1391 let std_array_data_param = driver.std_array_data_param;
1392
1393 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1394
1395 let port_handle = port_runtime.port_handle().clone();
1396
1397 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1398
1399 let enabled = Arc::new(AtomicBool::new(false));
1400 let blocking_mode = Arc::new(AtomicBool::new(false));
1401
1402 let array_output = Arc::new(parking_lot::Mutex::new(output));
1403 let array_output_for_handle = array_output.clone();
1404 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1405 processor,
1406 output: array_output,
1407 pool,
1408 ndarray_params,
1409 plugin_params,
1410 port_handle,
1411 array_counter: 0,
1412 std_array_data_param,
1413 min_callback_time: 0.0,
1414 last_process_time: None,
1415 sort_mode: 0,
1416 sort_time: 0.0,
1417 sort_size: 10,
1418 sort_buffer: SortBuffer::new(),
1419 }));
1420
1421 let data_enabled = enabled.clone();
1422 let data_blocking = blocking_mode.clone();
1423
1424 let mut array_sender = array_sender;
1425 array_sender.set_mode_flags(enabled, blocking_mode);
1426
1427 let nd_array_port_reason = plugin_params.nd_array_port;
1429 let sender_port_name = port_name.to_string();
1430 let initial_upstream = ndarray_port.to_string();
1431
1432 let data_jh = thread::Builder::new()
1433 .name(format!("plugin-data-{port_name}"))
1434 .spawn(move || {
1435 plugin_data_loop(
1436 shared,
1437 array_rx,
1438 param_rx,
1439 enable_callbacks_reason,
1440 blocking_callbacks_reason,
1441 min_callback_time_reason,
1442 sort_mode_reason,
1443 sort_time_reason,
1444 sort_size_reason,
1445 data_enabled,
1446 data_blocking,
1447 nd_array_port_reason,
1448 sender_port_name,
1449 initial_upstream,
1450 wiring,
1451 );
1452 })
1453 .expect("failed to spawn plugin data thread");
1454
1455 let handle = PluginRuntimeHandle {
1456 port_runtime,
1457 array_sender,
1458 array_output: array_output_for_handle,
1459 port_name: port_name.to_string(),
1460 ndarray_params,
1461 plugin_params,
1462 };
1463
1464 (handle, data_jh)
1465}
1466
1467#[cfg(test)]
1468mod tests {
1469 use super::*;
1470 use crate::ndarray::{NDDataType, NDDimension};
1471 use crate::plugin::channel::ndarray_channel;
1472
1473 struct PassthroughProcessor;
1475
1476 impl NDPluginProcess for PassthroughProcessor {
1477 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1478 ProcessResult::arrays(vec![Arc::new(array.clone())])
1479 }
1480 fn plugin_type(&self) -> &str {
1481 "Passthrough"
1482 }
1483 }
1484
1485 struct SinkProcessor {
1487 count: usize,
1488 }
1489
1490 impl NDPluginProcess for SinkProcessor {
1491 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1492 self.count += 1;
1493 ProcessResult::empty()
1494 }
1495 fn plugin_type(&self) -> &str {
1496 "Sink"
1497 }
1498 }
1499
1500 fn make_test_array(id: i32) -> Arc<NDArray> {
1501 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1502 arr.unique_id = id;
1503 Arc::new(arr)
1504 }
1505
1506 fn test_wiring() -> Arc<WiringRegistry> {
1507 Arc::new(WiringRegistry::new())
1508 }
1509
1510 fn enable_callbacks(handle: &PluginRuntimeHandle) {
1512 handle
1513 .port_runtime()
1514 .port_handle()
1515 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1516 .unwrap();
1517 std::thread::sleep(std::time::Duration::from_millis(10));
1518 }
1519
1520 fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
1524 let sender = sender.clone();
1525 let jh = std::thread::spawn(move || {
1526 let rt = tokio::runtime::Builder::new_current_thread()
1527 .enable_all()
1528 .build()
1529 .unwrap();
1530 rt.block_on(sender.publish(array));
1531 });
1532 jh.join().unwrap();
1533 }
1534
1535 #[test]
1536 fn test_passthrough_runtime() {
1537 let pool = Arc::new(NDArrayPool::new(1_000_000));
1538
1539 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1541 let mut output = NDArrayOutput::new();
1542 output.add(downstream_sender);
1543
1544 let (handle, _data_jh) = create_plugin_runtime_with_output(
1545 "PASS1",
1546 PassthroughProcessor,
1547 pool,
1548 10,
1549 output,
1550 "",
1551 test_wiring(),
1552 );
1553 enable_callbacks(&handle);
1554
1555 send_array(handle.array_sender(), make_test_array(42));
1557
1558 let received = downstream_rx.blocking_recv().unwrap();
1560 assert_eq!(received.unique_id, 42);
1561 }
1562
1563 #[test]
1564 fn test_sink_runtime() {
1565 let pool = Arc::new(NDArrayPool::new(1_000_000));
1566
1567 let (handle, _data_jh) = create_plugin_runtime(
1568 "SINK1",
1569 SinkProcessor { count: 0 },
1570 pool,
1571 10,
1572 "",
1573 test_wiring(),
1574 );
1575 enable_callbacks(&handle);
1576
1577 send_array(handle.array_sender(), make_test_array(1));
1579 send_array(handle.array_sender(), make_test_array(2));
1580
1581 std::thread::sleep(std::time::Duration::from_millis(50));
1583
1584 assert_eq!(handle.port_name(), "SINK1");
1586 }
1587
1588 #[test]
1589 fn test_plugin_type_param() {
1590 let pool = Arc::new(NDArrayPool::new(1_000_000));
1591
1592 let (handle, _data_jh) = create_plugin_runtime(
1593 "TYPE_TEST",
1594 PassthroughProcessor,
1595 pool,
1596 10,
1597 "",
1598 test_wiring(),
1599 );
1600
1601 assert_eq!(handle.port_name(), "TYPE_TEST");
1603 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1604 }
1605
1606 #[test]
1607 fn test_shutdown_on_handle_drop() {
1608 let pool = Arc::new(NDArrayPool::new(1_000_000));
1609
1610 let (handle, data_jh) = create_plugin_runtime(
1611 "SHUTDOWN_TEST",
1612 PassthroughProcessor,
1613 pool,
1614 10,
1615 "",
1616 test_wiring(),
1617 );
1618
1619 let sender = handle.array_sender().clone();
1621 drop(handle);
1622 drop(sender);
1623
1624 let result = data_jh.join();
1626 assert!(result.is_ok());
1627 }
1628
1629 #[test]
1630 fn test_nonblocking_passthrough() {
1631 let pool = Arc::new(NDArrayPool::new(1_000_000));
1632 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1633 let mut output = NDArrayOutput::new();
1634 output.add(downstream_sender);
1635
1636 let (handle, _data_jh) = create_plugin_runtime_with_output(
1637 "NB_TEST",
1638 PassthroughProcessor,
1639 pool,
1640 10,
1641 output,
1642 "",
1643 test_wiring(),
1644 );
1645 enable_callbacks(&handle);
1646
1647 send_array(handle.array_sender(), make_test_array(42));
1648
1649 let received = downstream_rx.blocking_recv().unwrap();
1650 assert_eq!(received.unique_id, 42);
1651 }
1652
1653 #[test]
1654 fn test_blocking_to_nonblocking_switch() {
1655 let pool = Arc::new(NDArrayPool::new(1_000_000));
1656 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1657 let mut output = NDArrayOutput::new();
1658 output.add(downstream_sender);
1659
1660 let (handle, _data_jh) = create_plugin_runtime_with_output(
1661 "SWITCH_TEST",
1662 PassthroughProcessor,
1663 pool,
1664 10,
1665 output,
1666 "",
1667 test_wiring(),
1668 );
1669 enable_callbacks(&handle);
1670
1671 handle
1673 .port_runtime()
1674 .port_handle()
1675 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1676 .unwrap();
1677 std::thread::sleep(std::time::Duration::from_millis(50));
1678
1679 send_array(handle.array_sender(), make_test_array(1));
1680 let received = downstream_rx.blocking_recv().unwrap();
1681 assert_eq!(received.unique_id, 1);
1682
1683 handle
1685 .port_runtime()
1686 .port_handle()
1687 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1688 .unwrap();
1689 std::thread::sleep(std::time::Duration::from_millis(50));
1690
1691 send_array(handle.array_sender(), make_test_array(2));
1693 let received = downstream_rx.blocking_recv().unwrap();
1694 assert_eq!(received.unique_id, 2);
1695 }
1696
1697 #[test]
1698 fn test_enable_callbacks_disables_processing() {
1699 let pool = Arc::new(NDArrayPool::new(1_000_000));
1700 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1701 let mut output = NDArrayOutput::new();
1702 output.add(downstream_sender);
1703
1704 let (handle, _data_jh) = create_plugin_runtime_with_output(
1705 "ENABLE_TEST",
1706 PassthroughProcessor,
1707 pool,
1708 10,
1709 output,
1710 "",
1711 test_wiring(),
1712 );
1713
1714 handle
1716 .port_runtime()
1717 .port_handle()
1718 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1719 .unwrap();
1720 std::thread::sleep(std::time::Duration::from_millis(50));
1721
1722 send_array(handle.array_sender(), make_test_array(99));
1724
1725 let rt = tokio::runtime::Builder::new_current_thread()
1727 .enable_all()
1728 .build()
1729 .unwrap();
1730 let result = rt.block_on(async {
1731 tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1732 });
1733 assert!(
1734 result.is_err(),
1735 "should not receive array when callbacks disabled"
1736 );
1737 }
1738
1739 #[test]
1740 fn test_downstream_receives_multiple() {
1741 let pool = Arc::new(NDArrayPool::new(1_000_000));
1742
1743 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1744 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1745 let mut output = NDArrayOutput::new();
1746 output.add(ds1);
1747 output.add(ds2);
1748
1749 let (handle, _data_jh) = create_plugin_runtime_with_output(
1750 "DS_TEST",
1751 PassthroughProcessor,
1752 pool,
1753 10,
1754 output,
1755 "",
1756 test_wiring(),
1757 );
1758 enable_callbacks(&handle);
1759
1760 send_array(handle.array_sender(), make_test_array(77));
1761
1762 let r1 = rx1.blocking_recv().unwrap();
1764 let r2 = rx2.blocking_recv().unwrap();
1765 assert_eq!(r1.unique_id, 77);
1766 assert_eq!(r2.unique_id, 77);
1767 }
1768
1769 #[test]
1770 fn test_param_updates_after_send() {
1771 let pool = Arc::new(NDArrayPool::new(1_000_000));
1772
1773 struct ParamTracker;
1774 impl NDPluginProcess for ParamTracker {
1775 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1776 ProcessResult::arrays(vec![Arc::new(array.clone())])
1777 }
1778 fn plugin_type(&self) -> &str {
1779 "ParamTracker"
1780 }
1781 }
1782
1783 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1784 let mut output = NDArrayOutput::new();
1785 output.add(downstream_sender);
1786
1787 let (handle, _data_jh) = create_plugin_runtime_with_output(
1788 "PARAM_TEST",
1789 ParamTracker,
1790 pool,
1791 10,
1792 output,
1793 "",
1794 test_wiring(),
1795 );
1796 enable_callbacks(&handle);
1797
1798 send_array(handle.array_sender(), make_test_array(1));
1800 let received = downstream_rx.blocking_recv().unwrap();
1801 assert_eq!(received.unique_id, 1);
1802
1803 handle
1805 .port_runtime()
1806 .port_handle()
1807 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1808 .unwrap();
1809 std::thread::sleep(std::time::Duration::from_millis(50));
1810
1811 send_array(handle.array_sender(), make_test_array(2));
1813 let received = downstream_rx.blocking_recv().unwrap();
1814 assert_eq!(received.unique_id, 2);
1815 }
1816
1817 #[test]
1818 fn test_sort_buffer_reorders_by_unique_id() {
1819 let mut buf = SortBuffer::new();
1820
1821 buf.insert(3, vec![make_test_array(3)], 10);
1823 buf.insert(1, vec![make_test_array(1)], 10);
1824 buf.insert(2, vec![make_test_array(2)], 10);
1825
1826 assert_eq!(buf.len(), 3);
1827
1828 let drained = buf.drain_all();
1829 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1830 assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1831 assert_eq!(buf.len(), 0);
1832 assert_eq!(buf.last_emitted_id, 3);
1833 }
1834
1835 #[test]
1836 fn test_sort_buffer_detects_disordered() {
1837 let mut buf = SortBuffer::new();
1838
1839 buf.insert(5, vec![make_test_array(5)], 10);
1841 buf.drain_all(); buf.insert(3, vec![make_test_array(3)], 10);
1844 assert_eq!(buf.disordered_arrays, 1);
1845 }
1846
1847 #[test]
1848 fn test_sort_buffer_drops_when_full() {
1849 let mut buf = SortBuffer::new();
1850
1851 buf.insert(1, vec![make_test_array(1)], 2);
1853 buf.insert(2, vec![make_test_array(2)], 2);
1854 buf.insert(3, vec![make_test_array(3)], 2);
1855
1856 assert_eq!(buf.len(), 2);
1858 assert_eq!(buf.dropped_output_arrays, 1);
1859
1860 let drained = buf.drain_all();
1861 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1862 assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1863 }
1864
1865 #[test]
1866 fn test_sort_mode_runtime_integration() {
1867 let pool = Arc::new(NDArrayPool::new(1_000_000));
1868 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1869 let mut output = NDArrayOutput::new();
1870 output.add(downstream_sender);
1871
1872 let (handle, _data_jh) = create_plugin_runtime_with_output(
1873 "SORT_TEST",
1874 PassthroughProcessor,
1875 pool,
1876 10,
1877 output,
1878 "",
1879 test_wiring(),
1880 );
1881 enable_callbacks(&handle);
1882
1883 handle
1885 .port_runtime()
1886 .port_handle()
1887 .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1888 .unwrap();
1889 handle
1890 .port_runtime()
1891 .port_handle()
1892 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1893 .unwrap();
1894 std::thread::sleep(std::time::Duration::from_millis(50));
1895
1896 send_array(handle.array_sender(), make_test_array(3));
1898 send_array(handle.array_sender(), make_test_array(1));
1899 send_array(handle.array_sender(), make_test_array(2));
1900 std::thread::sleep(std::time::Duration::from_millis(100));
1901
1902 let rt = tokio::runtime::Builder::new_current_thread()
1904 .enable_all()
1905 .build()
1906 .unwrap();
1907 let result = rt.block_on(async {
1908 tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1909 });
1910 assert!(
1911 result.is_err(),
1912 "arrays should be buffered while sort mode is active"
1913 );
1914
1915 handle
1917 .port_runtime()
1918 .port_handle()
1919 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1920 .unwrap();
1921 std::thread::sleep(std::time::Duration::from_millis(100));
1922
1923 let r1 = downstream_rx.blocking_recv().unwrap();
1925 let r2 = downstream_rx.blocking_recv().unwrap();
1926 let r3 = downstream_rx.blocking_recv().unwrap();
1927 assert_eq!(r1.unique_id, 1);
1928 assert_eq!(r2.unique_id, 2);
1929 assert_eq!(r3.unique_id, 3);
1930 }
1931}