1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
19use super::params::PluginBaseParams;
20use super::wiring::WiringRegistry;
21
22#[derive(Debug, Clone)]
24pub enum ParamChangeValue {
25 Int32(i32),
26 Float64(f64),
27 Octet(String),
28}
29
30impl ParamChangeValue {
31 pub fn as_i32(&self) -> i32 {
32 match self {
33 ParamChangeValue::Int32(v) => *v,
34 ParamChangeValue::Float64(v) => *v as i32,
35 ParamChangeValue::Octet(_) => 0,
36 }
37 }
38
39 pub fn as_f64(&self) -> f64 {
40 match self {
41 ParamChangeValue::Int32(v) => *v as f64,
42 ParamChangeValue::Float64(v) => *v,
43 ParamChangeValue::Octet(_) => 0.0,
44 }
45 }
46
47 pub fn as_string(&self) -> Option<&str> {
48 match self {
49 ParamChangeValue::Octet(s) => Some(s),
50 _ => None,
51 }
52 }
53}
54
55pub enum ParamUpdate {
57 Int32 {
58 reason: usize,
59 addr: i32,
60 value: i32,
61 },
62 Float64 {
63 reason: usize,
64 addr: i32,
65 value: f64,
66 },
67 Octet {
68 reason: usize,
69 addr: i32,
70 value: String,
71 },
72 Float64Array {
73 reason: usize,
74 addr: i32,
75 value: Vec<f64>,
76 },
77}
78
79impl ParamUpdate {
80 pub fn int32(reason: usize, value: i32) -> Self {
82 Self::Int32 {
83 reason,
84 addr: 0,
85 value,
86 }
87 }
88 pub fn float64(reason: usize, value: f64) -> Self {
90 Self::Float64 {
91 reason,
92 addr: 0,
93 value,
94 }
95 }
96 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
98 Self::Int32 {
99 reason,
100 addr,
101 value,
102 }
103 }
104 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
106 Self::Float64 {
107 reason,
108 addr,
109 value,
110 }
111 }
112 pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
114 Self::Float64Array {
115 reason,
116 addr: 0,
117 value,
118 }
119 }
120 pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
122 Self::Float64Array {
123 reason,
124 addr,
125 value,
126 }
127 }
128}
129
130pub struct ProcessResult {
132 pub output_arrays: Vec<Arc<NDArray>>,
133 pub param_updates: Vec<ParamUpdate>,
134 pub scatter_index: Option<usize>,
136}
137
138impl ProcessResult {
139 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
141 Self {
142 output_arrays: vec![],
143 param_updates,
144 scatter_index: None,
145 }
146 }
147
148 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
150 Self {
151 output_arrays,
152 param_updates: vec![],
153 scatter_index: None,
154 }
155 }
156
157 pub fn empty() -> Self {
159 Self {
160 output_arrays: vec![],
161 param_updates: vec![],
162 scatter_index: None,
163 }
164 }
165
166 pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
168 Self {
169 output_arrays,
170 param_updates: vec![],
171 scatter_index: Some(index),
172 }
173 }
174}
175
176pub struct ParamChangeResult {
178 pub output_arrays: Vec<Arc<NDArray>>,
179 pub param_updates: Vec<ParamUpdate>,
180}
181
182impl ParamChangeResult {
183 pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
184 Self {
185 output_arrays: vec![],
186 param_updates,
187 }
188 }
189
190 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
191 Self {
192 output_arrays,
193 param_updates: vec![],
194 }
195 }
196
197 pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
198 Self {
199 output_arrays,
200 param_updates,
201 }
202 }
203
204 pub fn empty() -> Self {
205 Self {
206 output_arrays: vec![],
207 param_updates: vec![],
208 }
209 }
210}
211
212pub trait NDPluginProcess: Send + 'static {
214 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
216
217 fn plugin_type(&self) -> &str;
219
220 fn register_params(
222 &mut self,
223 _base: &mut PortDriverBase,
224 ) -> Result<(), asyn_rs::error::AsynError> {
225 Ok(())
226 }
227
228 fn on_param_change(
231 &mut self,
232 _reason: usize,
233 _params: &PluginParamSnapshot,
234 ) -> ParamChangeResult {
235 ParamChangeResult::empty()
236 }
237
238 fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
242 None
243 }
244}
245
246pub struct PluginParamSnapshot {
248 pub enable_callbacks: bool,
249 pub reason: usize,
251 pub addr: i32,
253 pub value: ParamChangeValue,
255}
256
257struct SortBuffer {
263 entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
265 last_emitted_id: i32,
267 disordered_arrays: i32,
269 dropped_output_arrays: i32,
271}
272
273impl SortBuffer {
274 fn new() -> Self {
275 Self {
276 entries: BTreeMap::new(),
277 last_emitted_id: 0,
278 disordered_arrays: 0,
279 dropped_output_arrays: 0,
280 }
281 }
282
283 fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
285 if unique_id < self.last_emitted_id {
286 self.disordered_arrays += 1;
287 }
288 self.entries.entry(unique_id).or_default().extend(arrays);
289
290 while sort_size > 0 && self.entries.len() as i32 > sort_size {
292 if let Some((&oldest_key, _)) = self.entries.iter().next() {
293 self.entries.remove(&oldest_key);
294 self.dropped_output_arrays += 1;
295 }
296 }
297 }
298
299 fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
301 let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
302 if let Some(&(last_id, _)) = entries.last() {
303 self.last_emitted_id = last_id;
304 }
305 entries
306 }
307
308 fn len(&self) -> i32 {
310 self.entries.len() as i32
311 }
312}
313
314struct SharedProcessorInner<P: NDPluginProcess> {
317 processor: P,
318 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
319 pool: Arc<NDArrayPool>,
320 ndarray_params: NDArrayDriverParams,
321 plugin_params: PluginBaseParams,
322 port_handle: PortHandle,
323 array_counter: i32,
324 std_array_data_param: Option<usize>,
326 min_callback_time: f64,
328 last_process_time: Option<std::time::Instant>,
330 sort_mode: i32,
332 sort_time: f64,
334 sort_size: i32,
336 sort_buffer: SortBuffer,
338}
339
340impl<P: NDPluginProcess> SharedProcessorInner<P> {
341 fn should_throttle(&self) -> bool {
342 if self.min_callback_time <= 0.0 {
343 return false;
344 }
345 if let Some(last) = self.last_process_time {
346 last.elapsed().as_secs_f64() < self.min_callback_time
347 } else {
348 false
349 }
350 }
351
352 fn process_and_publish(&mut self, array: &NDArray) -> Option<ProcessOutput> {
356 if self.should_throttle() {
357 return None;
358 }
359 let t0 = std::time::Instant::now();
360 let result = self.processor.process_array(array, &self.pool);
361 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
362 self.last_process_time = Some(t0);
363
364 if self.sort_mode != 0 && !result.output_arrays.is_empty() {
365 let unique_id = array.unique_id;
366 self.sort_buffer
367 .insert(unique_id, result.output_arrays, self.sort_size);
368 let mut batch = self.build_sort_params_batch();
369 if !result.param_updates.is_empty() {
370 let frame_output = self.build_publish_batch(
371 vec![],
372 result.param_updates,
373 result.scatter_index,
374 Some(array),
375 elapsed_ms,
376 );
377 batch.merge(frame_output.batch);
378 }
379 Some(ProcessOutput {
380 arrays: vec![],
381 scatter_index: None,
382 batch,
383 })
384 } else {
385 Some(self.build_publish_batch(
386 result.output_arrays,
387 result.param_updates,
388 result.scatter_index,
389 Some(array),
390 elapsed_ms,
391 ))
392 }
393 }
394
395 fn flush_sort_buffer(&mut self) -> ProcessOutput {
397 let entries = self.sort_buffer.drain_all();
398 let mut all_arrays = Vec::new();
399 let mut combined = ParamBatch::empty();
400 for (_unique_id, arrays) in entries {
401 let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
402 all_arrays.extend(output.arrays);
403 combined.merge(output.batch);
404 }
405 combined.merge(self.build_sort_params_batch());
406 ProcessOutput {
407 arrays: all_arrays,
408 scatter_index: None,
409 batch: combined,
410 }
411 }
412
413 fn build_sort_params_batch(&self) -> ParamBatch {
414 use asyn_rs::request::ParamSetValue;
415 let sort_free = self.sort_size - self.sort_buffer.len();
416 ParamBatch {
417 addr0: vec![
418 ParamSetValue::Int32 {
419 reason: self.plugin_params.sort_free,
420 addr: 0,
421 value: sort_free,
422 },
423 ParamSetValue::Int32 {
424 reason: self.plugin_params.disordered_arrays,
425 addr: 0,
426 value: self.sort_buffer.disordered_arrays,
427 },
428 ParamSetValue::Int32 {
429 reason: self.plugin_params.dropped_output_arrays,
430 addr: 0,
431 value: self.sort_buffer.dropped_output_arrays,
432 },
433 ],
434 extra: std::collections::HashMap::new(),
435 }
436 }
437
438 fn build_publish_batch(
442 &mut self,
443 output_arrays: Vec<Arc<NDArray>>,
444 param_updates: Vec<ParamUpdate>,
445 scatter_index: Option<usize>,
446 fallback_array: Option<&NDArray>,
447 elapsed_ms: f64,
448 ) -> ProcessOutput {
449 use asyn_rs::request::ParamSetValue;
450
451 let mut addr0: Vec<ParamSetValue> = Vec::new();
452 let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
453 std::collections::HashMap::new();
454
455 if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
456 self.array_counter += 1;
457
458 if let Some(param) = self.std_array_data_param {
460 use crate::ndarray::NDDataBuffer;
461 use asyn_rs::param::ParamValue;
462 let value = match &report_arr.data {
463 NDDataBuffer::I8(v) => {
464 Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
465 }
466 NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
467 v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
468 ))),
469 NDDataBuffer::I16(v) => {
470 Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
471 }
472 NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
473 v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
474 ))),
475 NDDataBuffer::I32(v) => {
476 Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
477 }
478 NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
479 v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
480 ))),
481 NDDataBuffer::I64(v) => {
482 Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
483 }
484 NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
485 v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
486 ))),
487 NDDataBuffer::F32(v) => {
488 Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
489 }
490 NDDataBuffer::F64(v) => {
491 Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
492 }
493 };
494 if let Some(value) = value {
495 let ts = report_arr.timestamp.to_system_time();
496 self.port_handle
497 .interrupts()
498 .notify(asyn_rs::interrupt::InterruptValue {
499 reason: param,
500 addr: 0,
501 value,
502 timestamp: ts,
503 uint32_changed_mask: 0,
504 ..Default::default()
505 });
506 }
507 }
508
509 let info = report_arr.info();
510 let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
511 addr0.extend([
512 ParamSetValue::Int32 {
513 reason: self.ndarray_params.array_counter,
514 addr: 0,
515 value: self.array_counter,
516 },
517 ParamSetValue::Int32 {
518 reason: self.ndarray_params.unique_id,
519 addr: 0,
520 value: report_arr.unique_id,
521 },
522 ParamSetValue::Int32 {
523 reason: self.ndarray_params.n_dimensions,
524 addr: 0,
525 value: report_arr.dims.len() as i32,
526 },
527 ParamSetValue::Int32 {
528 reason: self.ndarray_params.array_size_x,
529 addr: 0,
530 value: info.x_size as i32,
531 },
532 ParamSetValue::Int32 {
533 reason: self.ndarray_params.array_size_y,
534 addr: 0,
535 value: info.y_size as i32,
536 },
537 ParamSetValue::Int32 {
538 reason: self.ndarray_params.array_size_z,
539 addr: 0,
540 value: info.color_size as i32,
541 },
542 ParamSetValue::Int32 {
543 reason: self.ndarray_params.array_size,
544 addr: 0,
545 value: info.total_bytes as i32,
546 },
547 ParamSetValue::Int32 {
548 reason: self.ndarray_params.data_type,
549 addr: 0,
550 value: report_arr.data.data_type() as i32,
551 },
552 ParamSetValue::Int32 {
553 reason: self.ndarray_params.color_mode,
554 addr: 0,
555 value: color_mode,
556 },
557 ParamSetValue::Float64 {
558 reason: self.ndarray_params.timestamp_rbv,
559 addr: 0,
560 value: report_arr.timestamp.as_f64(),
561 },
562 ParamSetValue::Int32 {
563 reason: self.ndarray_params.epics_ts_sec,
564 addr: 0,
565 value: report_arr.timestamp.sec as i32,
566 },
567 ParamSetValue::Int32 {
568 reason: self.ndarray_params.epics_ts_nsec,
569 addr: 0,
570 value: report_arr.timestamp.nsec as i32,
571 },
572 ]);
573 }
574
575 addr0.push(ParamSetValue::Float64 {
576 reason: self.plugin_params.execution_time,
577 addr: 0,
578 value: elapsed_ms,
579 });
580
581 for update in ¶m_updates {
586 match update {
587 ParamUpdate::Int32 {
588 reason,
589 addr,
590 value,
591 } => {
592 let pv = ParamSetValue::Int32 {
593 reason: *reason,
594 addr: *addr,
595 value: *value,
596 };
597 if *addr == 0 {
598 addr0.push(pv);
599 } else {
600 extra.entry(*addr).or_default().push(pv);
601 }
602 }
603 ParamUpdate::Float64 {
604 reason,
605 addr,
606 value,
607 } => {
608 let pv = ParamSetValue::Float64 {
609 reason: *reason,
610 addr: *addr,
611 value: *value,
612 };
613 if *addr == 0 {
614 addr0.push(pv);
615 } else {
616 extra.entry(*addr).or_default().push(pv);
617 }
618 }
619 ParamUpdate::Octet {
620 reason,
621 addr,
622 value,
623 } => {
624 let pv = ParamSetValue::Octet {
625 reason: *reason,
626 addr: *addr,
627 value: value.clone(),
628 };
629 if *addr == 0 {
630 addr0.push(pv);
631 } else {
632 extra.entry(*addr).or_default().push(pv);
633 }
634 }
635 ParamUpdate::Float64Array {
636 reason,
637 addr,
638 value,
639 } => {
640 let pv = ParamSetValue::Float64Array {
641 reason: *reason,
642 addr: *addr,
643 value: value.clone(),
644 };
645 if *addr == 0 {
646 addr0.push(pv);
647 } else {
648 extra.entry(*addr).or_default().push(pv);
649 }
650 }
651 }
652 }
653
654 ProcessOutput {
655 arrays: output_arrays,
656 scatter_index,
657 batch: ParamBatch { addr0, extra },
658 }
659 }
660}
661
662struct ProcessOutput {
664 arrays: Vec<Arc<NDArray>>,
665 scatter_index: Option<usize>,
666 batch: ParamBatch,
667}
668
669impl ProcessOutput {
670 async fn publish_arrays(&self, senders: &[NDArraySender]) {
676 for arr in &self.arrays {
677 if let Some(idx) = self.scatter_index {
678 if let Some(sender) = senders.get(idx % senders.len().max(1)) {
679 sender.publish(arr.clone()).await;
680 }
681 } else {
682 let futs = senders.iter().map(|s| s.publish(arr.clone()));
683 futures_util::future::join_all(futs).await;
684 }
685 }
686 }
687}
688
689struct ParamBatch {
692 addr0: Vec<asyn_rs::request::ParamSetValue>,
693 extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
694}
695
696impl ParamBatch {
697 fn empty() -> Self {
698 Self {
699 addr0: Vec::new(),
700 extra: std::collections::HashMap::new(),
701 }
702 }
703
704 fn merge(&mut self, other: ParamBatch) {
705 self.addr0.extend(other.addr0);
706 for (addr, updates) in other.extra {
707 self.extra.entry(addr).or_default().extend(updates);
708 }
709 }
710
711 async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
713 if !self.addr0.is_empty() {
714 if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
715 eprintln!("plugin param flush error (addr 0): {e}");
716 }
717 }
718 for (addr, updates) in self.extra {
719 if let Err(e) = port.set_params_and_notify(addr, updates).await {
720 eprintln!("plugin param flush error (addr {addr}): {e}");
721 }
722 }
723 }
724}
725
726#[allow(dead_code)]
728pub struct PluginPortDriver {
729 base: PortDriverBase,
730 ndarray_params: NDArrayDriverParams,
731 plugin_params: PluginBaseParams,
732 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
733 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
735 std_array_data_param: Option<usize>,
737}
738
739impl PluginPortDriver {
740 fn new<P: NDPluginProcess>(
741 port_name: &str,
742 plugin_type_name: &str,
743 queue_size: usize,
744 ndarray_port: &str,
745 max_addr: usize,
746 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
747 processor: &mut P,
748 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
749 ) -> AsynResult<Self> {
750 let mut base = PortDriverBase::new(
751 port_name,
752 max_addr,
753 PortFlags {
754 can_block: true,
755 ..Default::default()
756 },
757 );
758
759 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
760 let plugin_params = PluginBaseParams::create(&mut base)?;
761
762 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
764 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
765 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
766 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
767 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
768 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
769 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
770 base.set_int32_param(ndarray_params.write_file, 0, 0)?;
771 base.set_int32_param(ndarray_params.read_file, 0, 0)?;
772 base.set_int32_param(ndarray_params.capture, 0, 0)?;
773 base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
774 base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
775 base.set_string_param(ndarray_params.file_path, 0, "".into())?;
776 base.set_string_param(ndarray_params.file_name, 0, "".into())?;
777 base.set_int32_param(ndarray_params.file_number, 0, 0)?;
778 base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
779 base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
780 base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
781 base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
782 base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
783
784 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
786 base.set_string_param(
787 ndarray_params.ad_core_version,
788 0,
789 env!("CARGO_PKG_VERSION").into(),
790 )?;
791 base.set_string_param(
792 ndarray_params.driver_version,
793 0,
794 env!("CARGO_PKG_VERSION").into(),
795 )?;
796 if !ndarray_port.is_empty() {
797 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
798 }
799
800 let std_array_data_param = if array_data.is_some() {
802 Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
803 } else {
804 None
805 };
806
807 processor.register_params(&mut base)?;
809
810 Ok(Self {
811 base,
812 ndarray_params,
813 plugin_params,
814 param_change_tx,
815 array_data,
816 std_array_data_param,
817 })
818 }
819}
820
821fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
823 let n = src.len().min(dst.len());
824 dst[..n].copy_from_slice(&src[..n]);
825 n
826}
827
828fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
830where
831 S: CastToF64 + Copy,
832 D: CastFromF64 + Copy,
833{
834 let n = src.len().min(dst.len());
835 for i in 0..n {
836 dst[i] = D::cast_from_f64(src[i].cast_to_f64());
837 }
838 n
839}
840
841trait CastToF64 {
843 fn cast_to_f64(self) -> f64;
844}
845
846impl CastToF64 for i8 {
847 fn cast_to_f64(self) -> f64 {
848 self as f64
849 }
850}
851impl CastToF64 for u8 {
852 fn cast_to_f64(self) -> f64 {
853 self as f64
854 }
855}
856impl CastToF64 for i16 {
857 fn cast_to_f64(self) -> f64 {
858 self as f64
859 }
860}
861impl CastToF64 for u16 {
862 fn cast_to_f64(self) -> f64 {
863 self as f64
864 }
865}
866impl CastToF64 for i32 {
867 fn cast_to_f64(self) -> f64 {
868 self as f64
869 }
870}
871impl CastToF64 for u32 {
872 fn cast_to_f64(self) -> f64 {
873 self as f64
874 }
875}
876impl CastToF64 for i64 {
877 fn cast_to_f64(self) -> f64 {
878 self as f64
879 }
880}
881impl CastToF64 for u64 {
882 fn cast_to_f64(self) -> f64 {
883 self as f64
884 }
885}
886impl CastToF64 for f32 {
887 fn cast_to_f64(self) -> f64 {
888 self as f64
889 }
890}
891impl CastToF64 for f64 {
892 fn cast_to_f64(self) -> f64 {
893 self
894 }
895}
896
897trait CastFromF64 {
899 fn cast_from_f64(v: f64) -> Self;
900}
901
902impl CastFromF64 for i8 {
903 fn cast_from_f64(v: f64) -> Self {
904 v as i8
905 }
906}
907impl CastFromF64 for i16 {
908 fn cast_from_f64(v: f64) -> Self {
909 v as i16
910 }
911}
912impl CastFromF64 for i32 {
913 fn cast_from_f64(v: f64) -> Self {
914 v as i32
915 }
916}
917impl CastFromF64 for f32 {
918 fn cast_from_f64(v: f64) -> Self {
919 v as f32
920 }
921}
922impl CastFromF64 for f64 {
923 fn cast_from_f64(v: f64) -> Self {
924 v
925 }
926}
927
928macro_rules! impl_read_array {
931 ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
932 use crate::ndarray::NDDataBuffer;
933 let handle = match &$self.array_data {
934 Some(h) => h,
935 None => return Ok(0),
936 };
937 let guard = handle.lock();
938 let array = match &*guard {
939 Some(a) => a,
940 None => return Ok(0),
941 };
942 let n = match &array.data {
943 NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
944 $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
945 };
946 Ok(n)
947 }};
948}
949
950impl PortDriver for PluginPortDriver {
951 fn base(&self) -> &PortDriverBase {
952 &self.base
953 }
954
955 fn base_mut(&mut self) -> &mut PortDriverBase {
956 &mut self.base
957 }
958
959 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
960 let reason = user.reason;
961 let addr = user.addr;
962 self.base.set_int32_param(reason, addr, value)?;
963 self.base.call_param_callbacks(addr)?;
964 let _ = self
965 .param_change_tx
966 .try_send((reason, addr, ParamChangeValue::Int32(value)));
967 Ok(())
968 }
969
970 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
971 let reason = user.reason;
972 let addr = user.addr;
973 self.base.set_float64_param(reason, addr, value)?;
974 self.base.call_param_callbacks(addr)?;
975 let _ = self
976 .param_change_tx
977 .try_send((reason, addr, ParamChangeValue::Float64(value)));
978 Ok(())
979 }
980
981 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
982 let reason = user.reason;
983 let addr = user.addr;
984 let s = String::from_utf8_lossy(data).into_owned();
985 self.base.set_string_param(reason, addr, s.clone())?;
986 self.base.call_param_callbacks(addr)?;
987 let _ = self
988 .param_change_tx
989 .try_send((reason, addr, ParamChangeValue::Octet(s)));
990 Ok(())
991 }
992
993 fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
994 impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
995 }
996
997 fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
998 impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
999 }
1000
1001 fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
1002 impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
1003 }
1004
1005 fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
1006 impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
1007 }
1008
1009 fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
1010 impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
1011 }
1012}
1013
1014#[derive(Clone)]
1016pub struct PluginRuntimeHandle {
1017 port_runtime: PortRuntimeHandle,
1018 array_sender: NDArraySender,
1019 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
1020 port_name: String,
1021 pub ndarray_params: NDArrayDriverParams,
1022 pub plugin_params: PluginBaseParams,
1023}
1024
1025impl PluginRuntimeHandle {
1026 pub fn port_runtime(&self) -> &PortRuntimeHandle {
1027 &self.port_runtime
1028 }
1029
1030 pub fn array_sender(&self) -> &NDArraySender {
1031 &self.array_sender
1032 }
1033
1034 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
1035 &self.array_output
1036 }
1037
1038 pub fn port_name(&self) -> &str {
1039 &self.port_name
1040 }
1041}
1042
1043pub fn create_plugin_runtime<P: NDPluginProcess>(
1050 port_name: &str,
1051 processor: P,
1052 pool: Arc<NDArrayPool>,
1053 queue_size: usize,
1054 ndarray_port: &str,
1055 wiring: Arc<WiringRegistry>,
1056) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1057 create_plugin_runtime_multi_addr(
1058 port_name,
1059 processor,
1060 pool,
1061 queue_size,
1062 ndarray_port,
1063 wiring,
1064 1,
1065 )
1066}
1067
1068pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1072 port_name: &str,
1073 mut processor: P,
1074 pool: Arc<NDArrayPool>,
1075 queue_size: usize,
1076 ndarray_port: &str,
1077 wiring: Arc<WiringRegistry>,
1078 max_addr: usize,
1079) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1080 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1082
1083 let plugin_type_name = processor.plugin_type().to_string();
1085 let array_data = processor.array_data_handle();
1086
1087 let driver = PluginPortDriver::new(
1089 port_name,
1090 &plugin_type_name,
1091 queue_size,
1092 ndarray_port,
1093 max_addr,
1094 param_tx,
1095 &mut processor,
1096 array_data,
1097 )
1098 .expect("failed to create plugin port driver");
1099
1100 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1101 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1102 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1103 let sort_mode_reason = driver.plugin_params.sort_mode;
1104 let sort_time_reason = driver.plugin_params.sort_time;
1105 let sort_size_reason = driver.plugin_params.sort_size;
1106 let ndarray_params = driver.ndarray_params;
1107 let plugin_params = driver.plugin_params;
1108 let std_array_data_param = driver.std_array_data_param;
1109
1110 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1112
1113 let port_handle = port_runtime.port_handle().clone();
1115
1116 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1118
1119 let enabled = Arc::new(AtomicBool::new(false));
1121 let blocking_mode = Arc::new(AtomicBool::new(false));
1122
1123 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1125 let array_output_for_handle = array_output.clone();
1126 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1127 processor,
1128 output: array_output,
1129 pool,
1130 ndarray_params,
1131 plugin_params,
1132 port_handle,
1133 array_counter: 0,
1134 std_array_data_param,
1135 min_callback_time: 0.0,
1136 last_process_time: None,
1137 sort_mode: 0,
1138 sort_time: 0.0,
1139 sort_size: 10,
1140 sort_buffer: SortBuffer::new(),
1141 }));
1142
1143 let data_enabled = enabled.clone();
1144 let data_blocking = blocking_mode.clone();
1145
1146 let mut array_sender = array_sender;
1147 array_sender.set_mode_flags(enabled, blocking_mode);
1148
1149 let nd_array_port_reason = plugin_params.nd_array_port;
1151 let sender_port_name = port_name.to_string();
1152 let initial_upstream = ndarray_port.to_string();
1153
1154 let data_jh = thread::Builder::new()
1156 .name(format!("plugin-data-{port_name}"))
1157 .spawn(move || {
1158 plugin_data_loop(
1159 shared,
1160 array_rx,
1161 param_rx,
1162 enable_callbacks_reason,
1163 blocking_callbacks_reason,
1164 min_callback_time_reason,
1165 sort_mode_reason,
1166 sort_time_reason,
1167 sort_size_reason,
1168 data_enabled,
1169 data_blocking,
1170 nd_array_port_reason,
1171 sender_port_name,
1172 initial_upstream,
1173 wiring,
1174 );
1175 })
1176 .expect("failed to spawn plugin data thread");
1177
1178 let handle = PluginRuntimeHandle {
1179 port_runtime,
1180 array_sender,
1181 array_output: array_output_for_handle,
1182 port_name: port_name.to_string(),
1183 ndarray_params,
1184 plugin_params,
1185 };
1186
1187 (handle, data_jh)
1188}
1189
1190fn plugin_data_loop<P: NDPluginProcess>(
1191 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1192 mut array_rx: NDArrayReceiver,
1193 mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1194 enable_callbacks_reason: usize,
1195 blocking_callbacks_reason: usize,
1196 min_callback_time_reason: usize,
1197 sort_mode_reason: usize,
1198 sort_time_reason: usize,
1199 sort_size_reason: usize,
1200 enabled: Arc<AtomicBool>,
1201 blocking_mode: Arc<AtomicBool>,
1202 nd_array_port_reason: usize,
1203 sender_port_name: String,
1204 initial_upstream: String,
1205 wiring: Arc<WiringRegistry>,
1206) {
1207 let mut current_upstream = initial_upstream;
1208 let rt = tokio::runtime::Builder::new_current_thread()
1209 .enable_all()
1210 .build()
1211 .unwrap();
1212 rt.block_on(async {
1213 let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1216 let mut sort_flush_active = false;
1217
1218 loop {
1219 tokio::select! {
1220 msg = array_rx.recv_msg() => {
1221 match msg {
1222 Some(msg) => {
1223 let (process_output, senders, port) = {
1225 let mut guard = shared.lock();
1226 let output = guard.process_and_publish(&msg.array);
1227 let senders = guard.output.lock().senders_clone();
1228 let port = guard.port_handle.clone();
1229 (output, senders, port)
1230 };
1231 if let Some(po) = process_output {
1234 po.publish_arrays(&senders).await;
1235 po.batch.flush(&port).await;
1236 }
1237 }
1238 None => break,
1239 }
1240 }
1241 param = param_rx.recv() => {
1242 match param {
1243 Some((reason, addr, value)) => {
1244 if reason == enable_callbacks_reason {
1245 enabled.store(value.as_i32() != 0, Ordering::Release);
1246 }
1247 if reason == blocking_callbacks_reason {
1248 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1249 }
1250 if reason == min_callback_time_reason {
1252 shared.lock().min_callback_time = value.as_f64();
1253 }
1254 if reason == sort_mode_reason {
1256 let mode = value.as_i32();
1257 let flush_work = {
1260 let mut guard = shared.lock();
1261 guard.sort_mode = mode;
1262 if mode == 0 {
1263 let output = guard.flush_sort_buffer();
1264 let senders = guard.output.lock().senders_clone();
1265 let port = guard.port_handle.clone();
1266 sort_flush_active = false;
1267 Some((output, senders, port))
1268 } else {
1269 sort_flush_active = guard.sort_time > 0.0;
1270 if sort_flush_active {
1271 let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1272 sort_flush_interval = tokio::time::interval(dur);
1273 }
1274 None
1275 }
1276 };
1277 if let Some((output, senders, port)) = flush_work {
1278 output.publish_arrays(&senders).await;
1279 output.batch.flush(&port).await;
1280 }
1281 }
1282 if reason == sort_time_reason {
1283 let t = value.as_f64();
1284 let mut guard = shared.lock();
1285 guard.sort_time = t;
1286 if guard.sort_mode != 0 && t > 0.0 {
1287 sort_flush_active = true;
1288 let dur = std::time::Duration::from_secs_f64(t);
1289 sort_flush_interval = tokio::time::interval(dur);
1290 } else {
1291 sort_flush_active = false;
1292 }
1293 drop(guard);
1294 }
1295 if reason == sort_size_reason {
1296 shared.lock().sort_size = value.as_i32();
1297 }
1298 if reason == nd_array_port_reason {
1300 if let Some(new_port) = value.as_string() {
1301 if new_port != current_upstream {
1302 let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1303 if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1304 eprintln!("NDArrayPort rewire failed: {e}");
1305 current_upstream = old;
1306 }
1307 }
1308 }
1309 }
1310 let snapshot = PluginParamSnapshot {
1311 enable_callbacks: enabled.load(Ordering::Acquire),
1312 reason,
1313 addr,
1314 value,
1315 };
1316 let (process_output, senders, port) = {
1317 let mut guard = shared.lock();
1318 let t0 = std::time::Instant::now();
1319 let result = guard.processor.on_param_change(reason, &snapshot);
1320 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1321 let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1322 Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
1323 } else {
1324 None
1325 };
1326 let senders = guard.output.lock().senders_clone();
1327 (output, senders, guard.port_handle.clone())
1328 };
1329 if let Some(po) = process_output {
1330 po.publish_arrays(&senders).await;
1331 po.batch.flush(&port).await;
1332 }
1333 }
1334 None => break,
1335 }
1336 }
1337 _ = sort_flush_interval.tick(), if sort_flush_active => {
1338 let (output, senders, port) = {
1339 let mut guard = shared.lock();
1340 let output = guard.flush_sort_buffer();
1341 let senders = guard.output.lock().senders_clone();
1342 let port = guard.port_handle.clone();
1343 (output, senders, port)
1344 };
1345 output.publish_arrays(&senders).await;
1346 output.batch.flush(&port).await;
1347 }
1348 }
1349 }
1350 });
1351}
1352
1353pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1355 upstream.array_output().lock().add(downstream_sender);
1356}
1357
1358pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1360 port_name: &str,
1361 mut processor: P,
1362 pool: Arc<NDArrayPool>,
1363 queue_size: usize,
1364 output: NDArrayOutput,
1365 ndarray_port: &str,
1366 wiring: Arc<WiringRegistry>,
1367) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1368 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1369
1370 let plugin_type_name = processor.plugin_type().to_string();
1371 let array_data = processor.array_data_handle();
1372 let driver = PluginPortDriver::new(
1373 port_name,
1374 &plugin_type_name,
1375 queue_size,
1376 ndarray_port,
1377 1,
1378 param_tx,
1379 &mut processor,
1380 array_data,
1381 )
1382 .expect("failed to create plugin port driver");
1383
1384 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1385 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1386 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1387 let sort_mode_reason = driver.plugin_params.sort_mode;
1388 let sort_time_reason = driver.plugin_params.sort_time;
1389 let sort_size_reason = driver.plugin_params.sort_size;
1390 let ndarray_params = driver.ndarray_params;
1391 let plugin_params = driver.plugin_params;
1392 let std_array_data_param = driver.std_array_data_param;
1393
1394 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1395
1396 let port_handle = port_runtime.port_handle().clone();
1397
1398 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1399
1400 let enabled = Arc::new(AtomicBool::new(false));
1401 let blocking_mode = Arc::new(AtomicBool::new(false));
1402
1403 let array_output = Arc::new(parking_lot::Mutex::new(output));
1404 let array_output_for_handle = array_output.clone();
1405 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1406 processor,
1407 output: array_output,
1408 pool,
1409 ndarray_params,
1410 plugin_params,
1411 port_handle,
1412 array_counter: 0,
1413 std_array_data_param,
1414 min_callback_time: 0.0,
1415 last_process_time: None,
1416 sort_mode: 0,
1417 sort_time: 0.0,
1418 sort_size: 10,
1419 sort_buffer: SortBuffer::new(),
1420 }));
1421
1422 let data_enabled = enabled.clone();
1423 let data_blocking = blocking_mode.clone();
1424
1425 let mut array_sender = array_sender;
1426 array_sender.set_mode_flags(enabled, blocking_mode);
1427
1428 let nd_array_port_reason = plugin_params.nd_array_port;
1430 let sender_port_name = port_name.to_string();
1431 let initial_upstream = ndarray_port.to_string();
1432
1433 let data_jh = thread::Builder::new()
1434 .name(format!("plugin-data-{port_name}"))
1435 .spawn(move || {
1436 plugin_data_loop(
1437 shared,
1438 array_rx,
1439 param_rx,
1440 enable_callbacks_reason,
1441 blocking_callbacks_reason,
1442 min_callback_time_reason,
1443 sort_mode_reason,
1444 sort_time_reason,
1445 sort_size_reason,
1446 data_enabled,
1447 data_blocking,
1448 nd_array_port_reason,
1449 sender_port_name,
1450 initial_upstream,
1451 wiring,
1452 );
1453 })
1454 .expect("failed to spawn plugin data thread");
1455
1456 let handle = PluginRuntimeHandle {
1457 port_runtime,
1458 array_sender,
1459 array_output: array_output_for_handle,
1460 port_name: port_name.to_string(),
1461 ndarray_params,
1462 plugin_params,
1463 };
1464
1465 (handle, data_jh)
1466}
1467
1468#[cfg(test)]
1469mod tests {
1470 use super::*;
1471 use crate::ndarray::{NDDataType, NDDimension};
1472 use crate::plugin::channel::ndarray_channel;
1473
1474 struct PassthroughProcessor;
1476
1477 impl NDPluginProcess for PassthroughProcessor {
1478 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1479 ProcessResult::arrays(vec![Arc::new(array.clone())])
1480 }
1481 fn plugin_type(&self) -> &str {
1482 "Passthrough"
1483 }
1484 }
1485
1486 struct SinkProcessor {
1488 count: usize,
1489 }
1490
1491 impl NDPluginProcess for SinkProcessor {
1492 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1493 self.count += 1;
1494 ProcessResult::empty()
1495 }
1496 fn plugin_type(&self) -> &str {
1497 "Sink"
1498 }
1499 }
1500
1501 fn make_test_array(id: i32) -> Arc<NDArray> {
1502 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1503 arr.unique_id = id;
1504 Arc::new(arr)
1505 }
1506
1507 fn test_wiring() -> Arc<WiringRegistry> {
1508 Arc::new(WiringRegistry::new())
1509 }
1510
1511 fn enable_callbacks(handle: &PluginRuntimeHandle) {
1513 handle
1514 .port_runtime()
1515 .port_handle()
1516 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1517 .unwrap();
1518 std::thread::sleep(std::time::Duration::from_millis(10));
1519 }
1520
1521 fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
1525 let sender = sender.clone();
1526 let jh = std::thread::spawn(move || {
1527 let rt = tokio::runtime::Builder::new_current_thread()
1528 .enable_all()
1529 .build()
1530 .unwrap();
1531 rt.block_on(sender.publish(array));
1532 });
1533 jh.join().unwrap();
1534 }
1535
1536 #[test]
1537 fn test_passthrough_runtime() {
1538 let pool = Arc::new(NDArrayPool::new(1_000_000));
1539
1540 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1542 let mut output = NDArrayOutput::new();
1543 output.add(downstream_sender);
1544
1545 let (handle, _data_jh) = create_plugin_runtime_with_output(
1546 "PASS1",
1547 PassthroughProcessor,
1548 pool,
1549 10,
1550 output,
1551 "",
1552 test_wiring(),
1553 );
1554 enable_callbacks(&handle);
1555
1556 send_array(handle.array_sender(), make_test_array(42));
1558
1559 let received = downstream_rx.blocking_recv().unwrap();
1561 assert_eq!(received.unique_id, 42);
1562 }
1563
1564 #[test]
1565 fn test_sink_runtime() {
1566 let pool = Arc::new(NDArrayPool::new(1_000_000));
1567
1568 let (handle, _data_jh) = create_plugin_runtime(
1569 "SINK1",
1570 SinkProcessor { count: 0 },
1571 pool,
1572 10,
1573 "",
1574 test_wiring(),
1575 );
1576 enable_callbacks(&handle);
1577
1578 send_array(handle.array_sender(), make_test_array(1));
1580 send_array(handle.array_sender(), make_test_array(2));
1581
1582 std::thread::sleep(std::time::Duration::from_millis(50));
1584
1585 assert_eq!(handle.port_name(), "SINK1");
1587 }
1588
1589 #[test]
1590 fn test_plugin_type_param() {
1591 let pool = Arc::new(NDArrayPool::new(1_000_000));
1592
1593 let (handle, _data_jh) = create_plugin_runtime(
1594 "TYPE_TEST",
1595 PassthroughProcessor,
1596 pool,
1597 10,
1598 "",
1599 test_wiring(),
1600 );
1601
1602 assert_eq!(handle.port_name(), "TYPE_TEST");
1604 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1605 }
1606
1607 #[test]
1608 fn test_shutdown_on_handle_drop() {
1609 let pool = Arc::new(NDArrayPool::new(1_000_000));
1610
1611 let (handle, data_jh) = create_plugin_runtime(
1612 "SHUTDOWN_TEST",
1613 PassthroughProcessor,
1614 pool,
1615 10,
1616 "",
1617 test_wiring(),
1618 );
1619
1620 let sender = handle.array_sender().clone();
1622 drop(handle);
1623 drop(sender);
1624
1625 let result = data_jh.join();
1627 assert!(result.is_ok());
1628 }
1629
1630 #[test]
1631 fn test_nonblocking_passthrough() {
1632 let pool = Arc::new(NDArrayPool::new(1_000_000));
1633 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1634 let mut output = NDArrayOutput::new();
1635 output.add(downstream_sender);
1636
1637 let (handle, _data_jh) = create_plugin_runtime_with_output(
1638 "NB_TEST",
1639 PassthroughProcessor,
1640 pool,
1641 10,
1642 output,
1643 "",
1644 test_wiring(),
1645 );
1646 enable_callbacks(&handle);
1647
1648 send_array(handle.array_sender(), make_test_array(42));
1649
1650 let received = downstream_rx.blocking_recv().unwrap();
1651 assert_eq!(received.unique_id, 42);
1652 }
1653
1654 #[test]
1655 fn test_blocking_to_nonblocking_switch() {
1656 let pool = Arc::new(NDArrayPool::new(1_000_000));
1657 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1658 let mut output = NDArrayOutput::new();
1659 output.add(downstream_sender);
1660
1661 let (handle, _data_jh) = create_plugin_runtime_with_output(
1662 "SWITCH_TEST",
1663 PassthroughProcessor,
1664 pool,
1665 10,
1666 output,
1667 "",
1668 test_wiring(),
1669 );
1670 enable_callbacks(&handle);
1671
1672 handle
1674 .port_runtime()
1675 .port_handle()
1676 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1677 .unwrap();
1678 std::thread::sleep(std::time::Duration::from_millis(50));
1679
1680 send_array(handle.array_sender(), make_test_array(1));
1681 let received = downstream_rx.blocking_recv().unwrap();
1682 assert_eq!(received.unique_id, 1);
1683
1684 handle
1686 .port_runtime()
1687 .port_handle()
1688 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1689 .unwrap();
1690 std::thread::sleep(std::time::Duration::from_millis(50));
1691
1692 send_array(handle.array_sender(), make_test_array(2));
1694 let received = downstream_rx.blocking_recv().unwrap();
1695 assert_eq!(received.unique_id, 2);
1696 }
1697
1698 #[test]
1699 fn test_enable_callbacks_disables_processing() {
1700 let pool = Arc::new(NDArrayPool::new(1_000_000));
1701 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1702 let mut output = NDArrayOutput::new();
1703 output.add(downstream_sender);
1704
1705 let (handle, _data_jh) = create_plugin_runtime_with_output(
1706 "ENABLE_TEST",
1707 PassthroughProcessor,
1708 pool,
1709 10,
1710 output,
1711 "",
1712 test_wiring(),
1713 );
1714
1715 handle
1717 .port_runtime()
1718 .port_handle()
1719 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1720 .unwrap();
1721 std::thread::sleep(std::time::Duration::from_millis(50));
1722
1723 send_array(handle.array_sender(), make_test_array(99));
1725
1726 let rt = tokio::runtime::Builder::new_current_thread()
1728 .enable_all()
1729 .build()
1730 .unwrap();
1731 let result = rt.block_on(async {
1732 tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1733 });
1734 assert!(
1735 result.is_err(),
1736 "should not receive array when callbacks disabled"
1737 );
1738 }
1739
1740 #[test]
1741 fn test_downstream_receives_multiple() {
1742 let pool = Arc::new(NDArrayPool::new(1_000_000));
1743
1744 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1745 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1746 let mut output = NDArrayOutput::new();
1747 output.add(ds1);
1748 output.add(ds2);
1749
1750 let (handle, _data_jh) = create_plugin_runtime_with_output(
1751 "DS_TEST",
1752 PassthroughProcessor,
1753 pool,
1754 10,
1755 output,
1756 "",
1757 test_wiring(),
1758 );
1759 enable_callbacks(&handle);
1760
1761 send_array(handle.array_sender(), make_test_array(77));
1762
1763 let r1 = rx1.blocking_recv().unwrap();
1765 let r2 = rx2.blocking_recv().unwrap();
1766 assert_eq!(r1.unique_id, 77);
1767 assert_eq!(r2.unique_id, 77);
1768 }
1769
1770 #[test]
1771 fn test_param_updates_after_send() {
1772 let pool = Arc::new(NDArrayPool::new(1_000_000));
1773
1774 struct ParamTracker;
1775 impl NDPluginProcess for ParamTracker {
1776 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1777 ProcessResult::arrays(vec![Arc::new(array.clone())])
1778 }
1779 fn plugin_type(&self) -> &str {
1780 "ParamTracker"
1781 }
1782 }
1783
1784 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1785 let mut output = NDArrayOutput::new();
1786 output.add(downstream_sender);
1787
1788 let (handle, _data_jh) = create_plugin_runtime_with_output(
1789 "PARAM_TEST",
1790 ParamTracker,
1791 pool,
1792 10,
1793 output,
1794 "",
1795 test_wiring(),
1796 );
1797 enable_callbacks(&handle);
1798
1799 send_array(handle.array_sender(), make_test_array(1));
1801 let received = downstream_rx.blocking_recv().unwrap();
1802 assert_eq!(received.unique_id, 1);
1803
1804 handle
1806 .port_runtime()
1807 .port_handle()
1808 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1809 .unwrap();
1810 std::thread::sleep(std::time::Duration::from_millis(50));
1811
1812 send_array(handle.array_sender(), make_test_array(2));
1814 let received = downstream_rx.blocking_recv().unwrap();
1815 assert_eq!(received.unique_id, 2);
1816 }
1817
1818 #[test]
1819 fn test_sort_buffer_reorders_by_unique_id() {
1820 let mut buf = SortBuffer::new();
1821
1822 buf.insert(3, vec![make_test_array(3)], 10);
1824 buf.insert(1, vec![make_test_array(1)], 10);
1825 buf.insert(2, vec![make_test_array(2)], 10);
1826
1827 assert_eq!(buf.len(), 3);
1828
1829 let drained = buf.drain_all();
1830 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1831 assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1832 assert_eq!(buf.len(), 0);
1833 assert_eq!(buf.last_emitted_id, 3);
1834 }
1835
1836 #[test]
1837 fn test_sort_buffer_detects_disordered() {
1838 let mut buf = SortBuffer::new();
1839
1840 buf.insert(5, vec![make_test_array(5)], 10);
1842 buf.drain_all(); buf.insert(3, vec![make_test_array(3)], 10);
1845 assert_eq!(buf.disordered_arrays, 1);
1846 }
1847
1848 #[test]
1849 fn test_sort_buffer_drops_when_full() {
1850 let mut buf = SortBuffer::new();
1851
1852 buf.insert(1, vec![make_test_array(1)], 2);
1854 buf.insert(2, vec![make_test_array(2)], 2);
1855 buf.insert(3, vec![make_test_array(3)], 2);
1856
1857 assert_eq!(buf.len(), 2);
1859 assert_eq!(buf.dropped_output_arrays, 1);
1860
1861 let drained = buf.drain_all();
1862 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1863 assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1864 }
1865
1866 #[test]
1867 fn test_sort_mode_runtime_integration() {
1868 let pool = Arc::new(NDArrayPool::new(1_000_000));
1869 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1870 let mut output = NDArrayOutput::new();
1871 output.add(downstream_sender);
1872
1873 let (handle, _data_jh) = create_plugin_runtime_with_output(
1874 "SORT_TEST",
1875 PassthroughProcessor,
1876 pool,
1877 10,
1878 output,
1879 "",
1880 test_wiring(),
1881 );
1882 enable_callbacks(&handle);
1883
1884 handle
1886 .port_runtime()
1887 .port_handle()
1888 .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1889 .unwrap();
1890 handle
1891 .port_runtime()
1892 .port_handle()
1893 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1894 .unwrap();
1895 std::thread::sleep(std::time::Duration::from_millis(50));
1896
1897 send_array(handle.array_sender(), make_test_array(3));
1899 send_array(handle.array_sender(), make_test_array(1));
1900 send_array(handle.array_sender(), make_test_array(2));
1901 std::thread::sleep(std::time::Duration::from_millis(100));
1902
1903 let rt = tokio::runtime::Builder::new_current_thread()
1905 .enable_all()
1906 .build()
1907 .unwrap();
1908 let result = rt.block_on(async {
1909 tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1910 });
1911 assert!(
1912 result.is_err(),
1913 "arrays should be buffered while sort mode is active"
1914 );
1915
1916 handle
1918 .port_runtime()
1919 .port_handle()
1920 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1921 .unwrap();
1922 std::thread::sleep(std::time::Duration::from_millis(100));
1923
1924 let r1 = downstream_rx.blocking_recv().unwrap();
1926 let r2 = downstream_rx.blocking_recv().unwrap();
1927 let r3 = downstream_rx.blocking_recv().unwrap();
1928 assert_eq!(r1.unique_id, 1);
1929 assert_eq!(r2.unique_id, 2);
1930 assert_eq!(r3.unique_id, 3);
1931 }
1932}