1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{
19 BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
20};
21use super::params::PluginBaseParams;
22use super::wiring::WiringRegistry;
23
24#[derive(Debug, Clone)]
26pub enum ParamChangeValue {
27 Int32(i32),
28 Float64(f64),
29 Octet(String),
30}
31
32impl ParamChangeValue {
33 pub fn as_i32(&self) -> i32 {
34 match self {
35 ParamChangeValue::Int32(v) => *v,
36 ParamChangeValue::Float64(v) => *v as i32,
37 ParamChangeValue::Octet(_) => 0,
38 }
39 }
40
41 pub fn as_f64(&self) -> f64 {
42 match self {
43 ParamChangeValue::Int32(v) => *v as f64,
44 ParamChangeValue::Float64(v) => *v,
45 ParamChangeValue::Octet(_) => 0.0,
46 }
47 }
48
49 pub fn as_string(&self) -> Option<&str> {
50 match self {
51 ParamChangeValue::Octet(s) => Some(s),
52 _ => None,
53 }
54 }
55}
56
57pub enum ParamUpdate {
59 Int32 {
60 reason: usize,
61 addr: i32,
62 value: i32,
63 },
64 Float64 {
65 reason: usize,
66 addr: i32,
67 value: f64,
68 },
69 Octet {
70 reason: usize,
71 addr: i32,
72 value: String,
73 },
74 Float64Array {
75 reason: usize,
76 addr: i32,
77 value: Vec<f64>,
78 },
79}
80
81impl ParamUpdate {
82 pub fn int32(reason: usize, value: i32) -> Self {
84 Self::Int32 {
85 reason,
86 addr: 0,
87 value,
88 }
89 }
90 pub fn float64(reason: usize, value: f64) -> Self {
92 Self::Float64 {
93 reason,
94 addr: 0,
95 value,
96 }
97 }
98 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
100 Self::Int32 {
101 reason,
102 addr,
103 value,
104 }
105 }
106 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
108 Self::Float64 {
109 reason,
110 addr,
111 value,
112 }
113 }
114 pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
116 Self::Float64Array {
117 reason,
118 addr: 0,
119 value,
120 }
121 }
122 pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
124 Self::Float64Array {
125 reason,
126 addr,
127 value,
128 }
129 }
130}
131
132pub struct ProcessResult {
134 pub output_arrays: Vec<Arc<NDArray>>,
135 pub param_updates: Vec<ParamUpdate>,
136 pub scatter_index: Option<usize>,
138}
139
140impl ProcessResult {
141 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
143 Self {
144 output_arrays: vec![],
145 param_updates,
146 scatter_index: None,
147 }
148 }
149
150 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
152 Self {
153 output_arrays,
154 param_updates: vec![],
155 scatter_index: None,
156 }
157 }
158
159 pub fn empty() -> Self {
161 Self {
162 output_arrays: vec![],
163 param_updates: vec![],
164 scatter_index: None,
165 }
166 }
167
168 pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
170 Self {
171 output_arrays,
172 param_updates: vec![],
173 scatter_index: Some(index),
174 }
175 }
176}
177
178pub struct ParamChangeResult {
180 pub output_arrays: Vec<Arc<NDArray>>,
181 pub param_updates: Vec<ParamUpdate>,
182}
183
184impl ParamChangeResult {
185 pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
186 Self {
187 output_arrays: vec![],
188 param_updates,
189 }
190 }
191
192 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
193 Self {
194 output_arrays,
195 param_updates: vec![],
196 }
197 }
198
199 pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
200 Self {
201 output_arrays,
202 param_updates,
203 }
204 }
205
206 pub fn empty() -> Self {
207 Self {
208 output_arrays: vec![],
209 param_updates: vec![],
210 }
211 }
212}
213
214pub trait NDPluginProcess: Send + 'static {
216 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
218
219 fn plugin_type(&self) -> &str;
221
222 fn register_params(
224 &mut self,
225 _base: &mut PortDriverBase,
226 ) -> Result<(), asyn_rs::error::AsynError> {
227 Ok(())
228 }
229
230 fn on_param_change(
233 &mut self,
234 _reason: usize,
235 _params: &PluginParamSnapshot,
236 ) -> ParamChangeResult {
237 ParamChangeResult::empty()
238 }
239
240 fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
244 None
245 }
246}
247
248pub struct PluginParamSnapshot {
250 pub enable_callbacks: bool,
251 pub reason: usize,
253 pub addr: i32,
255 pub value: ParamChangeValue,
257}
258
259struct SortBuffer {
265 entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
267 last_emitted_id: i32,
269 disordered_arrays: i32,
271 dropped_output_arrays: i32,
273}
274
275impl SortBuffer {
276 fn new() -> Self {
277 Self {
278 entries: BTreeMap::new(),
279 last_emitted_id: 0,
280 disordered_arrays: 0,
281 dropped_output_arrays: 0,
282 }
283 }
284
285 fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
287 if unique_id < self.last_emitted_id {
288 self.disordered_arrays += 1;
289 }
290 self.entries.entry(unique_id).or_default().extend(arrays);
291
292 while sort_size > 0 && self.entries.len() as i32 > sort_size {
294 if let Some((&oldest_key, _)) = self.entries.iter().next() {
295 self.entries.remove(&oldest_key);
296 self.dropped_output_arrays += 1;
297 }
298 }
299 }
300
301 fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
303 let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
304 if let Some(&(last_id, _)) = entries.last() {
305 self.last_emitted_id = last_id;
306 }
307 entries
308 }
309
310 fn len(&self) -> i32 {
312 self.entries.len() as i32
313 }
314}
315
316struct SharedProcessorInner<P: NDPluginProcess> {
319 processor: P,
320 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
321 pool: Arc<NDArrayPool>,
322 ndarray_params: NDArrayDriverParams,
323 plugin_params: PluginBaseParams,
324 port_handle: PortHandle,
325 array_counter: i32,
326 std_array_data_param: Option<usize>,
328 min_callback_time: f64,
330 last_process_time: Option<std::time::Instant>,
332 sort_mode: i32,
334 sort_time: f64,
336 sort_size: i32,
338 sort_buffer: SortBuffer,
340}
341
342impl<P: NDPluginProcess> SharedProcessorInner<P> {
343 fn should_throttle(&self) -> bool {
344 if self.min_callback_time <= 0.0 {
345 return false;
346 }
347 if let Some(last) = self.last_process_time {
348 last.elapsed().as_secs_f64() < self.min_callback_time
349 } else {
350 false
351 }
352 }
353
354 fn process_and_publish(&mut self, array: &NDArray) {
355 if self.should_throttle() {
356 return;
357 }
358 let t0 = std::time::Instant::now();
359 let result = self.processor.process_array(array, &self.pool);
360 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
361 self.last_process_time = Some(t0);
362
363 if self.sort_mode != 0 && !result.output_arrays.is_empty() {
364 let unique_id = array.unique_id;
366 self.sort_buffer
367 .insert(unique_id, result.output_arrays, self.sort_size);
368 self.update_sort_params();
370 if !result.param_updates.is_empty() {
372 self.publish_result(
373 vec![],
374 result.param_updates,
375 result.scatter_index,
376 Some(array),
377 elapsed_ms,
378 );
379 }
380 } else {
381 self.publish_result(
382 result.output_arrays,
383 result.param_updates,
384 result.scatter_index,
385 Some(array),
386 elapsed_ms,
387 );
388 }
389 }
390
391 fn flush_sort_buffer(&mut self) {
393 let entries = self.sort_buffer.drain_all();
394 for (_unique_id, arrays) in entries {
395 self.publish_result(arrays, vec![], None, None, 0.0);
396 }
397 self.update_sort_params();
398 }
399
400 fn update_sort_params(&self) {
402 let sort_free = self.sort_size - self.sort_buffer.len();
403 self.port_handle
404 .write_int32_no_wait(self.plugin_params.sort_free, 0, sort_free);
405 self.port_handle.write_int32_no_wait(
406 self.plugin_params.disordered_arrays,
407 0,
408 self.sort_buffer.disordered_arrays,
409 );
410 self.port_handle.write_int32_no_wait(
411 self.plugin_params.dropped_output_arrays,
412 0,
413 self.sort_buffer.dropped_output_arrays,
414 );
415 }
416
417 fn publish_result(
418 &mut self,
419 output_arrays: Vec<Arc<NDArray>>,
420 param_updates: Vec<ParamUpdate>,
421 scatter_index: Option<usize>,
422 fallback_array: Option<&NDArray>,
423 elapsed_ms: f64,
424 ) {
425 let output = self.output.lock();
426 for out in &output_arrays {
427 if let Some(idx) = scatter_index {
428 output.publish_to(idx, out.clone());
429 } else {
430 output.publish(out.clone());
431 }
432 }
433 drop(output);
434
435 if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
436 self.array_counter += 1;
437
438 if let Some(param) = self.std_array_data_param {
441 use crate::ndarray::NDDataBuffer;
442 use asyn_rs::param::ParamValue;
443 let value = match &report_arr.data {
444 NDDataBuffer::I8(v) => {
445 Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
446 }
447 NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
448 v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
449 ))),
450 NDDataBuffer::I16(v) => {
451 Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
452 }
453 NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
454 v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
455 ))),
456 NDDataBuffer::I32(v) => {
457 Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
458 }
459 NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
460 v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
461 ))),
462 NDDataBuffer::I64(v) => {
463 Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
464 }
465 NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
466 v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
467 ))),
468 NDDataBuffer::F32(v) => {
469 Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
470 }
471 NDDataBuffer::F64(v) => {
472 Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
473 }
474 };
475 if let Some(value) = value {
476 let ts = report_arr.timestamp.to_system_time();
477 self.port_handle
478 .interrupts()
479 .notify(asyn_rs::interrupt::InterruptValue {
480 reason: param,
481 addr: 0,
482 value,
483 timestamp: ts,
484 uint32_changed_mask: 0,
485 });
486 }
487 }
488
489 let info = report_arr.info();
490 let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
491 self.port_handle.write_int32_no_wait(
492 self.ndarray_params.array_counter,
493 0,
494 self.array_counter,
495 );
496 self.port_handle.write_int32_no_wait(
497 self.ndarray_params.unique_id,
498 0,
499 report_arr.unique_id,
500 );
501 self.port_handle.write_int32_no_wait(
502 self.ndarray_params.n_dimensions,
503 0,
504 report_arr.dims.len() as i32,
505 );
506 self.port_handle.write_int32_no_wait(
507 self.ndarray_params.array_size_x,
508 0,
509 info.x_size as i32,
510 );
511 self.port_handle.write_int32_no_wait(
512 self.ndarray_params.array_size_y,
513 0,
514 info.y_size as i32,
515 );
516 self.port_handle.write_int32_no_wait(
517 self.ndarray_params.array_size_z,
518 0,
519 info.color_size as i32,
520 );
521 self.port_handle.write_int32_no_wait(
522 self.ndarray_params.array_size,
523 0,
524 info.total_bytes as i32,
525 );
526 self.port_handle.write_int32_no_wait(
527 self.ndarray_params.data_type,
528 0,
529 report_arr.data.data_type() as i32,
530 );
531 self.port_handle
532 .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
533
534 let ts_f64 = report_arr.timestamp.as_f64();
535 self.port_handle
536 .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
537 self.port_handle.write_int32_no_wait(
538 self.ndarray_params.epics_ts_sec,
539 0,
540 report_arr.timestamp.sec as i32,
541 );
542 self.port_handle.write_int32_no_wait(
543 self.ndarray_params.epics_ts_nsec,
544 0,
545 report_arr.timestamp.nsec as i32,
546 );
547 }
548
549 self.port_handle
550 .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
551
552 use asyn_rs::request::ParamSetValue;
555
556 let mut addr0_updates: Vec<ParamSetValue> = Vec::new();
557 let mut extra_addr_map: std::collections::HashMap<i32, Vec<ParamSetValue>> =
558 std::collections::HashMap::new();
559
560 for update in ¶m_updates {
561 match update {
562 ParamUpdate::Int32 {
563 reason,
564 addr,
565 value,
566 } => {
567 let pv = ParamSetValue::Int32 {
568 reason: *reason,
569 addr: *addr,
570 value: *value,
571 };
572 if *addr == 0 {
573 addr0_updates.push(pv);
574 } else {
575 extra_addr_map.entry(*addr).or_default().push(pv);
576 }
577 }
578 ParamUpdate::Float64 {
579 reason,
580 addr,
581 value,
582 } => {
583 let pv = ParamSetValue::Float64 {
584 reason: *reason,
585 addr: *addr,
586 value: *value,
587 };
588 if *addr == 0 {
589 addr0_updates.push(pv);
590 } else {
591 extra_addr_map.entry(*addr).or_default().push(pv);
592 }
593 }
594 ParamUpdate::Octet {
595 reason,
596 addr,
597 value,
598 } => {
599 let pv = ParamSetValue::Octet {
600 reason: *reason,
601 addr: *addr,
602 value: value.clone(),
603 };
604 if *addr == 0 {
605 addr0_updates.push(pv);
606 } else {
607 extra_addr_map.entry(*addr).or_default().push(pv);
608 }
609 }
610 ParamUpdate::Float64Array {
611 reason,
612 addr,
613 value,
614 } => {
615 let pv = ParamSetValue::Float64Array {
616 reason: *reason,
617 addr: *addr,
618 value: value.clone(),
619 };
620 if *addr == 0 {
621 addr0_updates.push(pv);
622 } else {
623 extra_addr_map.entry(*addr).or_default().push(pv);
624 }
625 }
626 }
627 }
628
629 self.port_handle.set_params_and_notify(0, addr0_updates);
630 for (addr, updates) in extra_addr_map {
631 self.port_handle.set_params_and_notify(addr, updates);
632 }
633 }
634}
635
636struct BlockingProcessorHandle<P: NDPluginProcess> {
639 inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
640}
641
642impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
643 fn process_and_publish(&self, array: &NDArray) {
644 self.inner.lock().process_and_publish(array);
645 }
646}
647
648#[allow(dead_code)]
650pub struct PluginPortDriver {
651 base: PortDriverBase,
652 ndarray_params: NDArrayDriverParams,
653 plugin_params: PluginBaseParams,
654 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
655 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
657 std_array_data_param: Option<usize>,
659}
660
661impl PluginPortDriver {
662 fn new<P: NDPluginProcess>(
663 port_name: &str,
664 plugin_type_name: &str,
665 queue_size: usize,
666 ndarray_port: &str,
667 max_addr: usize,
668 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
669 processor: &mut P,
670 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
671 ) -> AsynResult<Self> {
672 let mut base = PortDriverBase::new(
673 port_name,
674 max_addr,
675 PortFlags {
676 can_block: true,
677 ..Default::default()
678 },
679 );
680
681 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
682 let plugin_params = PluginBaseParams::create(&mut base)?;
683
684 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
686 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
687 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
688 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
689 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
690 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
691 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
692 base.set_int32_param(ndarray_params.write_file, 0, 0)?;
693 base.set_int32_param(ndarray_params.read_file, 0, 0)?;
694 base.set_int32_param(ndarray_params.capture, 0, 0)?;
695 base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
696 base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
697 base.set_string_param(ndarray_params.file_path, 0, "".into())?;
698 base.set_string_param(ndarray_params.file_name, 0, "".into())?;
699 base.set_int32_param(ndarray_params.file_number, 0, 0)?;
700 base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
701 base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
702 base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
703 base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
704 base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
705
706 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
708 if !ndarray_port.is_empty() {
709 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
710 }
711
712 let std_array_data_param = if array_data.is_some() {
714 Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
715 } else {
716 None
717 };
718
719 processor.register_params(&mut base)?;
721
722 Ok(Self {
723 base,
724 ndarray_params,
725 plugin_params,
726 param_change_tx,
727 array_data,
728 std_array_data_param,
729 })
730 }
731}
732
733fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
735 let n = src.len().min(dst.len());
736 dst[..n].copy_from_slice(&src[..n]);
737 n
738}
739
740fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
742where
743 S: CastToF64 + Copy,
744 D: CastFromF64 + Copy,
745{
746 let n = src.len().min(dst.len());
747 for i in 0..n {
748 dst[i] = D::cast_from_f64(src[i].cast_to_f64());
749 }
750 n
751}
752
753trait CastToF64 {
755 fn cast_to_f64(self) -> f64;
756}
757
758impl CastToF64 for i8 {
759 fn cast_to_f64(self) -> f64 {
760 self as f64
761 }
762}
763impl CastToF64 for u8 {
764 fn cast_to_f64(self) -> f64 {
765 self as f64
766 }
767}
768impl CastToF64 for i16 {
769 fn cast_to_f64(self) -> f64 {
770 self as f64
771 }
772}
773impl CastToF64 for u16 {
774 fn cast_to_f64(self) -> f64 {
775 self as f64
776 }
777}
778impl CastToF64 for i32 {
779 fn cast_to_f64(self) -> f64 {
780 self as f64
781 }
782}
783impl CastToF64 for u32 {
784 fn cast_to_f64(self) -> f64 {
785 self as f64
786 }
787}
788impl CastToF64 for i64 {
789 fn cast_to_f64(self) -> f64 {
790 self as f64
791 }
792}
793impl CastToF64 for u64 {
794 fn cast_to_f64(self) -> f64 {
795 self as f64
796 }
797}
798impl CastToF64 for f32 {
799 fn cast_to_f64(self) -> f64 {
800 self as f64
801 }
802}
803impl CastToF64 for f64 {
804 fn cast_to_f64(self) -> f64 {
805 self
806 }
807}
808
809trait CastFromF64 {
811 fn cast_from_f64(v: f64) -> Self;
812}
813
814impl CastFromF64 for i8 {
815 fn cast_from_f64(v: f64) -> Self {
816 v as i8
817 }
818}
819impl CastFromF64 for i16 {
820 fn cast_from_f64(v: f64) -> Self {
821 v as i16
822 }
823}
824impl CastFromF64 for i32 {
825 fn cast_from_f64(v: f64) -> Self {
826 v as i32
827 }
828}
829impl CastFromF64 for f32 {
830 fn cast_from_f64(v: f64) -> Self {
831 v as f32
832 }
833}
834impl CastFromF64 for f64 {
835 fn cast_from_f64(v: f64) -> Self {
836 v
837 }
838}
839
840macro_rules! impl_read_array {
843 ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
844 use crate::ndarray::NDDataBuffer;
845 let handle = match &$self.array_data {
846 Some(h) => h,
847 None => return Ok(0),
848 };
849 let guard = handle.lock();
850 let array = match &*guard {
851 Some(a) => a,
852 None => return Ok(0),
853 };
854 let n = match &array.data {
855 NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
856 $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
857 };
858 Ok(n)
859 }};
860}
861
862impl PortDriver for PluginPortDriver {
863 fn base(&self) -> &PortDriverBase {
864 &self.base
865 }
866
867 fn base_mut(&mut self) -> &mut PortDriverBase {
868 &mut self.base
869 }
870
871 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
872 let reason = user.reason;
873 let addr = user.addr;
874 self.base.set_int32_param(reason, addr, value)?;
875 self.base.call_param_callbacks(addr)?;
876 let _ = self
877 .param_change_tx
878 .try_send((reason, addr, ParamChangeValue::Int32(value)));
879 Ok(())
880 }
881
882 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
883 let reason = user.reason;
884 let addr = user.addr;
885 self.base.set_float64_param(reason, addr, value)?;
886 self.base.call_param_callbacks(addr)?;
887 let _ = self
888 .param_change_tx
889 .try_send((reason, addr, ParamChangeValue::Float64(value)));
890 Ok(())
891 }
892
893 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
894 let reason = user.reason;
895 let addr = user.addr;
896 let s = String::from_utf8_lossy(data).into_owned();
897 self.base.set_string_param(reason, addr, s.clone())?;
898 self.base.call_param_callbacks(addr)?;
899 let _ = self
900 .param_change_tx
901 .try_send((reason, addr, ParamChangeValue::Octet(s)));
902 Ok(())
903 }
904
905 fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
906 impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
907 }
908
909 fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
910 impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
911 }
912
913 fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
914 impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
915 }
916
917 fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
918 impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
919 }
920
921 fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
922 impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
923 }
924}
925
926#[derive(Clone)]
928pub struct PluginRuntimeHandle {
929 port_runtime: PortRuntimeHandle,
930 array_sender: NDArraySender,
931 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
932 port_name: String,
933 pub ndarray_params: NDArrayDriverParams,
934 pub plugin_params: PluginBaseParams,
935}
936
937impl PluginRuntimeHandle {
938 pub fn port_runtime(&self) -> &PortRuntimeHandle {
939 &self.port_runtime
940 }
941
942 pub fn array_sender(&self) -> &NDArraySender {
943 &self.array_sender
944 }
945
946 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
947 &self.array_output
948 }
949
950 pub fn port_name(&self) -> &str {
951 &self.port_name
952 }
953}
954
955pub fn create_plugin_runtime<P: NDPluginProcess>(
962 port_name: &str,
963 processor: P,
964 pool: Arc<NDArrayPool>,
965 queue_size: usize,
966 ndarray_port: &str,
967 wiring: Arc<WiringRegistry>,
968) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
969 create_plugin_runtime_multi_addr(
970 port_name,
971 processor,
972 pool,
973 queue_size,
974 ndarray_port,
975 wiring,
976 1,
977 )
978}
979
980pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
984 port_name: &str,
985 mut processor: P,
986 pool: Arc<NDArrayPool>,
987 queue_size: usize,
988 ndarray_port: &str,
989 wiring: Arc<WiringRegistry>,
990 max_addr: usize,
991) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
992 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
994
995 let plugin_type_name = processor.plugin_type().to_string();
997 let array_data = processor.array_data_handle();
998
999 let driver = PluginPortDriver::new(
1001 port_name,
1002 &plugin_type_name,
1003 queue_size,
1004 ndarray_port,
1005 max_addr,
1006 param_tx,
1007 &mut processor,
1008 array_data,
1009 )
1010 .expect("failed to create plugin port driver");
1011
1012 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1013 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1014 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1015 let sort_mode_reason = driver.plugin_params.sort_mode;
1016 let sort_time_reason = driver.plugin_params.sort_time;
1017 let sort_size_reason = driver.plugin_params.sort_size;
1018 let ndarray_params = driver.ndarray_params;
1019 let plugin_params = driver.plugin_params;
1020 let std_array_data_param = driver.std_array_data_param;
1021
1022 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1024
1025 let port_handle = port_runtime.port_handle().clone();
1027
1028 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1030
1031 let enabled = Arc::new(AtomicBool::new(false));
1033 let blocking_mode = Arc::new(AtomicBool::new(false));
1034
1035 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1037 let array_output_for_handle = array_output.clone();
1038 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1039 processor,
1040 output: array_output,
1041 pool,
1042 ndarray_params,
1043 plugin_params,
1044 port_handle,
1045 array_counter: 0,
1046 std_array_data_param,
1047 min_callback_time: 0.0,
1048 last_process_time: None,
1049 sort_mode: 0,
1050 sort_time: 0.0,
1051 sort_size: 10,
1052 sort_buffer: SortBuffer::new(),
1053 }));
1054
1055 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1057 inner: shared.clone(),
1058 });
1059
1060 let data_enabled = enabled.clone();
1061 let data_blocking = blocking_mode.clone();
1062
1063 let dropped_count = array_sender.dropped_count_shared();
1065 let queue_tx = array_sender.tx_clone();
1066
1067 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1068
1069 let nd_array_port_reason = plugin_params.nd_array_port;
1071 let sender_port_name = port_name.to_string();
1072 let initial_upstream = ndarray_port.to_string();
1073
1074 let data_jh = thread::Builder::new()
1076 .name(format!("plugin-data-{port_name}"))
1077 .spawn(move || {
1078 plugin_data_loop(
1079 shared,
1080 array_rx,
1081 param_rx,
1082 enable_callbacks_reason,
1083 blocking_callbacks_reason,
1084 min_callback_time_reason,
1085 sort_mode_reason,
1086 sort_time_reason,
1087 sort_size_reason,
1088 data_enabled,
1089 data_blocking,
1090 nd_array_port_reason,
1091 sender_port_name,
1092 initial_upstream,
1093 wiring,
1094 dropped_count,
1095 queue_tx,
1096 );
1097 })
1098 .expect("failed to spawn plugin data thread");
1099
1100 let handle = PluginRuntimeHandle {
1101 port_runtime,
1102 array_sender,
1103 array_output: array_output_for_handle,
1104 port_name: port_name.to_string(),
1105 ndarray_params,
1106 plugin_params,
1107 };
1108
1109 (handle, data_jh)
1110}
1111
1112fn plugin_data_loop<P: NDPluginProcess>(
1113 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1114 mut array_rx: NDArrayReceiver,
1115 mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1116 enable_callbacks_reason: usize,
1117 blocking_callbacks_reason: usize,
1118 min_callback_time_reason: usize,
1119 sort_mode_reason: usize,
1120 sort_time_reason: usize,
1121 sort_size_reason: usize,
1122 enabled: Arc<AtomicBool>,
1123 blocking_mode: Arc<AtomicBool>,
1124 nd_array_port_reason: usize,
1125 sender_port_name: String,
1126 initial_upstream: String,
1127 wiring: Arc<WiringRegistry>,
1128 dropped_count: Arc<std::sync::atomic::AtomicU64>,
1129 queue_tx: tokio::sync::mpsc::Sender<super::channel::ArrayMessage>,
1130) {
1131 let mut current_upstream = initial_upstream;
1132 let rt = tokio::runtime::Builder::new_current_thread()
1133 .enable_all()
1134 .build()
1135 .unwrap();
1136 rt.block_on(async {
1137 let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1140 let mut sort_flush_active = false;
1141
1142 loop {
1143 tokio::select! {
1144 msg = array_rx.recv_msg() => {
1145 match msg {
1146 Some(msg) => {
1147 if !blocking_mode.load(Ordering::Acquire) {
1150 shared.lock().process_and_publish(&msg.array);
1151 }
1152 let guard = shared.lock();
1156 let queue_free = queue_tx.capacity() as i32;
1157 let dropped = dropped_count.load(Ordering::Relaxed) as i32;
1158 guard.port_handle.write_int32_no_wait(
1159 guard.plugin_params.queue_use, 0, queue_free,
1160 );
1161 guard.port_handle.write_int32_no_wait(
1162 guard.plugin_params.dropped_arrays, 0, dropped,
1163 );
1164 drop(guard);
1165 }
1166 None => break,
1167 }
1168 }
1169 param = param_rx.recv() => {
1170 match param {
1171 Some((reason, addr, value)) => {
1172 if reason == enable_callbacks_reason {
1173 enabled.store(value.as_i32() != 0, Ordering::Release);
1174 }
1175 if reason == blocking_callbacks_reason {
1176 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1177 }
1178 if reason == min_callback_time_reason {
1180 shared.lock().min_callback_time = value.as_f64();
1181 }
1182 if reason == sort_mode_reason {
1184 let mode = value.as_i32();
1185 let mut guard = shared.lock();
1186 guard.sort_mode = mode;
1187 if mode == 0 {
1188 guard.flush_sort_buffer();
1190 sort_flush_active = false;
1191 } else {
1192 sort_flush_active = guard.sort_time > 0.0;
1194 if sort_flush_active {
1195 let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1196 sort_flush_interval = tokio::time::interval(dur);
1197 }
1198 }
1199 drop(guard);
1200 }
1201 if reason == sort_time_reason {
1202 let t = value.as_f64();
1203 let mut guard = shared.lock();
1204 guard.sort_time = t;
1205 if guard.sort_mode != 0 && t > 0.0 {
1206 sort_flush_active = true;
1207 let dur = std::time::Duration::from_secs_f64(t);
1208 sort_flush_interval = tokio::time::interval(dur);
1209 } else {
1210 sort_flush_active = false;
1211 }
1212 drop(guard);
1213 }
1214 if reason == sort_size_reason {
1215 shared.lock().sort_size = value.as_i32();
1216 }
1217 if reason == nd_array_port_reason {
1219 if let Some(new_port) = value.as_string() {
1220 if new_port != current_upstream {
1221 let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1222 if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1223 eprintln!("NDArrayPort rewire failed: {e}");
1224 current_upstream = old;
1225 }
1226 }
1227 }
1228 }
1229 let snapshot = PluginParamSnapshot {
1230 enable_callbacks: enabled.load(Ordering::Acquire),
1231 reason,
1232 addr,
1233 value,
1234 };
1235 let mut guard = shared.lock();
1236 let t0 = std::time::Instant::now();
1237 let result = guard.processor.on_param_change(reason, &snapshot);
1238 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1239 if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1240 guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
1241 }
1242 drop(guard);
1243 }
1244 None => break,
1245 }
1246 }
1247 _ = sort_flush_interval.tick(), if sort_flush_active => {
1248 shared.lock().flush_sort_buffer();
1249 }
1250 }
1251 }
1252 });
1253}
1254
1255pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1257 upstream.array_output().lock().add(downstream_sender);
1258}
1259
1260pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1262 port_name: &str,
1263 mut processor: P,
1264 pool: Arc<NDArrayPool>,
1265 queue_size: usize,
1266 output: NDArrayOutput,
1267 ndarray_port: &str,
1268 wiring: Arc<WiringRegistry>,
1269) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1270 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1271
1272 let plugin_type_name = processor.plugin_type().to_string();
1273 let array_data = processor.array_data_handle();
1274 let driver = PluginPortDriver::new(
1275 port_name,
1276 &plugin_type_name,
1277 queue_size,
1278 ndarray_port,
1279 1,
1280 param_tx,
1281 &mut processor,
1282 array_data,
1283 )
1284 .expect("failed to create plugin port driver");
1285
1286 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1287 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1288 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1289 let sort_mode_reason = driver.plugin_params.sort_mode;
1290 let sort_time_reason = driver.plugin_params.sort_time;
1291 let sort_size_reason = driver.plugin_params.sort_size;
1292 let ndarray_params = driver.ndarray_params;
1293 let plugin_params = driver.plugin_params;
1294 let std_array_data_param = driver.std_array_data_param;
1295
1296 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1297
1298 let port_handle = port_runtime.port_handle().clone();
1299
1300 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1301
1302 let enabled = Arc::new(AtomicBool::new(false));
1303 let blocking_mode = Arc::new(AtomicBool::new(false));
1304
1305 let array_output = Arc::new(parking_lot::Mutex::new(output));
1306 let array_output_for_handle = array_output.clone();
1307 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1308 processor,
1309 output: array_output,
1310 pool,
1311 ndarray_params,
1312 plugin_params,
1313 port_handle,
1314 array_counter: 0,
1315 std_array_data_param,
1316 min_callback_time: 0.0,
1317 last_process_time: None,
1318 sort_mode: 0,
1319 sort_time: 0.0,
1320 sort_size: 10,
1321 sort_buffer: SortBuffer::new(),
1322 }));
1323
1324 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1325 inner: shared.clone(),
1326 });
1327
1328 let data_enabled = enabled.clone();
1329 let data_blocking = blocking_mode.clone();
1330
1331 let dropped_count = array_sender.dropped_count_shared();
1333 let queue_tx = array_sender.tx_clone();
1334
1335 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1336
1337 let nd_array_port_reason = plugin_params.nd_array_port;
1339 let sender_port_name = port_name.to_string();
1340 let initial_upstream = ndarray_port.to_string();
1341
1342 let data_jh = thread::Builder::new()
1343 .name(format!("plugin-data-{port_name}"))
1344 .spawn(move || {
1345 plugin_data_loop(
1346 shared,
1347 array_rx,
1348 param_rx,
1349 enable_callbacks_reason,
1350 blocking_callbacks_reason,
1351 min_callback_time_reason,
1352 sort_mode_reason,
1353 sort_time_reason,
1354 sort_size_reason,
1355 data_enabled,
1356 data_blocking,
1357 nd_array_port_reason,
1358 sender_port_name,
1359 initial_upstream,
1360 wiring,
1361 dropped_count,
1362 queue_tx,
1363 );
1364 })
1365 .expect("failed to spawn plugin data thread");
1366
1367 let handle = PluginRuntimeHandle {
1368 port_runtime,
1369 array_sender,
1370 array_output: array_output_for_handle,
1371 port_name: port_name.to_string(),
1372 ndarray_params,
1373 plugin_params,
1374 };
1375
1376 (handle, data_jh)
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381 use super::*;
1382 use crate::ndarray::{NDDataType, NDDimension};
1383 use crate::plugin::channel::ndarray_channel;
1384
1385 struct PassthroughProcessor;
1387
1388 impl NDPluginProcess for PassthroughProcessor {
1389 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1390 ProcessResult::arrays(vec![Arc::new(array.clone())])
1391 }
1392 fn plugin_type(&self) -> &str {
1393 "Passthrough"
1394 }
1395 }
1396
1397 struct SinkProcessor {
1399 count: usize,
1400 }
1401
1402 impl NDPluginProcess for SinkProcessor {
1403 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1404 self.count += 1;
1405 ProcessResult::empty()
1406 }
1407 fn plugin_type(&self) -> &str {
1408 "Sink"
1409 }
1410 }
1411
1412 fn make_test_array(id: i32) -> Arc<NDArray> {
1413 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1414 arr.unique_id = id;
1415 Arc::new(arr)
1416 }
1417
1418 fn test_wiring() -> Arc<WiringRegistry> {
1419 Arc::new(WiringRegistry::new())
1420 }
1421
1422 fn enable_callbacks(handle: &PluginRuntimeHandle) {
1424 handle
1425 .port_runtime()
1426 .port_handle()
1427 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1428 .unwrap();
1429 std::thread::sleep(std::time::Duration::from_millis(10));
1430 }
1431
1432 #[test]
1433 fn test_passthrough_runtime() {
1434 let pool = Arc::new(NDArrayPool::new(1_000_000));
1435
1436 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1438 let mut output = NDArrayOutput::new();
1439 output.add(downstream_sender);
1440
1441 let (handle, _data_jh) = create_plugin_runtime_with_output(
1442 "PASS1",
1443 PassthroughProcessor,
1444 pool,
1445 10,
1446 output,
1447 "",
1448 test_wiring(),
1449 );
1450 enable_callbacks(&handle);
1451
1452 handle.array_sender().send(make_test_array(42));
1454
1455 let received = downstream_rx.blocking_recv().unwrap();
1457 assert_eq!(received.unique_id, 42);
1458 }
1459
1460 #[test]
1461 fn test_sink_runtime() {
1462 let pool = Arc::new(NDArrayPool::new(1_000_000));
1463
1464 let (handle, _data_jh) = create_plugin_runtime(
1465 "SINK1",
1466 SinkProcessor { count: 0 },
1467 pool,
1468 10,
1469 "",
1470 test_wiring(),
1471 );
1472 enable_callbacks(&handle);
1473
1474 handle.array_sender().send(make_test_array(1));
1476 handle.array_sender().send(make_test_array(2));
1477
1478 std::thread::sleep(std::time::Duration::from_millis(50));
1480
1481 assert_eq!(handle.port_name(), "SINK1");
1483 }
1484
1485 #[test]
1486 fn test_plugin_type_param() {
1487 let pool = Arc::new(NDArrayPool::new(1_000_000));
1488
1489 let (handle, _data_jh) = create_plugin_runtime(
1490 "TYPE_TEST",
1491 PassthroughProcessor,
1492 pool,
1493 10,
1494 "",
1495 test_wiring(),
1496 );
1497
1498 assert_eq!(handle.port_name(), "TYPE_TEST");
1500 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1501 }
1502
1503 #[test]
1504 fn test_shutdown_on_handle_drop() {
1505 let pool = Arc::new(NDArrayPool::new(1_000_000));
1506
1507 let (handle, data_jh) = create_plugin_runtime(
1508 "SHUTDOWN_TEST",
1509 PassthroughProcessor,
1510 pool,
1511 10,
1512 "",
1513 test_wiring(),
1514 );
1515
1516 let sender = handle.array_sender().clone();
1518 drop(handle);
1519 drop(sender);
1520
1521 let result = data_jh.join();
1523 assert!(result.is_ok());
1524 }
1525
1526 #[test]
1527 fn test_dropped_count_when_queue_full() {
1528 let pool = Arc::new(NDArrayPool::new(1_000_000));
1529
1530 struct SlowProcessor;
1532 impl NDPluginProcess for SlowProcessor {
1533 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1534 std::thread::sleep(std::time::Duration::from_millis(100));
1535 ProcessResult::empty()
1536 }
1537 fn plugin_type(&self) -> &str {
1538 "Slow"
1539 }
1540 }
1541
1542 let (handle, _data_jh) =
1543 create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1544 enable_callbacks(&handle);
1545
1546 for i in 0..10 {
1548 handle.array_sender().send(make_test_array(i));
1549 }
1550
1551 assert!(handle.array_sender().dropped_count() > 0);
1553 }
1554
1555 #[test]
1556 fn test_blocking_callbacks_basic() {
1557 let pool = Arc::new(NDArrayPool::new(1_000_000));
1558 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1559 let mut output = NDArrayOutput::new();
1560 output.add(downstream_sender);
1561
1562 let (handle, _data_jh) = create_plugin_runtime_with_output(
1563 "BLOCK_TEST",
1564 PassthroughProcessor,
1565 pool,
1566 10,
1567 output,
1568 "",
1569 test_wiring(),
1570 );
1571 enable_callbacks(&handle);
1572
1573 handle
1575 .port_runtime()
1576 .port_handle()
1577 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1578 .unwrap();
1579 std::thread::sleep(std::time::Duration::from_millis(50));
1580
1581 handle.array_sender().send(make_test_array(42));
1583
1584 let received = downstream_rx.blocking_recv().unwrap();
1586 assert_eq!(received.unique_id, 42);
1587 }
1588
1589 #[test]
1590 fn test_blocking_to_nonblocking_switch() {
1591 let pool = Arc::new(NDArrayPool::new(1_000_000));
1592 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1593 let mut output = NDArrayOutput::new();
1594 output.add(downstream_sender);
1595
1596 let (handle, _data_jh) = create_plugin_runtime_with_output(
1597 "SWITCH_TEST",
1598 PassthroughProcessor,
1599 pool,
1600 10,
1601 output,
1602 "",
1603 test_wiring(),
1604 );
1605 enable_callbacks(&handle);
1606
1607 handle
1609 .port_runtime()
1610 .port_handle()
1611 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1612 .unwrap();
1613 std::thread::sleep(std::time::Duration::from_millis(50));
1614
1615 handle.array_sender().send(make_test_array(1));
1616 let received = downstream_rx.blocking_recv().unwrap();
1617 assert_eq!(received.unique_id, 1);
1618
1619 handle
1621 .port_runtime()
1622 .port_handle()
1623 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1624 .unwrap();
1625 std::thread::sleep(std::time::Duration::from_millis(50));
1626
1627 handle.array_sender().send(make_test_array(2));
1629 let received = downstream_rx.blocking_recv().unwrap();
1630 assert_eq!(received.unique_id, 2);
1631 }
1632
1633 #[test]
1634 fn test_enable_callbacks_disables_processing() {
1635 let pool = Arc::new(NDArrayPool::new(1_000_000));
1636 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1637 let mut output = NDArrayOutput::new();
1638 output.add(downstream_sender);
1639
1640 let (handle, _data_jh) = create_plugin_runtime_with_output(
1641 "ENABLE_TEST",
1642 PassthroughProcessor,
1643 pool,
1644 10,
1645 output,
1646 "",
1647 test_wiring(),
1648 );
1649
1650 handle
1652 .port_runtime()
1653 .port_handle()
1654 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1655 .unwrap();
1656 std::thread::sleep(std::time::Duration::from_millis(50));
1657
1658 handle.array_sender().send(make_test_array(99));
1660
1661 let rt = tokio::runtime::Builder::new_current_thread()
1663 .enable_all()
1664 .build()
1665 .unwrap();
1666 let result = rt.block_on(async {
1667 tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1668 });
1669 assert!(
1670 result.is_err(),
1671 "should not receive array when callbacks disabled"
1672 );
1673 }
1674
1675 #[test]
1676 fn test_blocking_downstream_receives() {
1677 let pool = Arc::new(NDArrayPool::new(1_000_000));
1678
1679 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1680 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1681 let mut output = NDArrayOutput::new();
1682 output.add(ds1);
1683 output.add(ds2);
1684
1685 let (handle, _data_jh) = create_plugin_runtime_with_output(
1686 "BLOCK_DS_TEST",
1687 PassthroughProcessor,
1688 pool,
1689 10,
1690 output,
1691 "",
1692 test_wiring(),
1693 );
1694 enable_callbacks(&handle);
1695
1696 handle
1698 .port_runtime()
1699 .port_handle()
1700 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1701 .unwrap();
1702 std::thread::sleep(std::time::Duration::from_millis(50));
1703
1704 handle.array_sender().send(make_test_array(77));
1705
1706 let r1 = rx1.blocking_recv().unwrap();
1708 let r2 = rx2.blocking_recv().unwrap();
1709 assert_eq!(r1.unique_id, 77);
1710 assert_eq!(r2.unique_id, 77);
1711 }
1712
1713 #[test]
1714 fn test_blocking_param_updates() {
1715 let pool = Arc::new(NDArrayPool::new(1_000_000));
1716
1717 struct ParamTracker;
1718 impl NDPluginProcess for ParamTracker {
1719 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1720 ProcessResult::arrays(vec![Arc::new(array.clone())])
1721 }
1722 fn plugin_type(&self) -> &str {
1723 "ParamTracker"
1724 }
1725 }
1726
1727 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1728 let mut output = NDArrayOutput::new();
1729 output.add(downstream_sender);
1730
1731 let (handle, _data_jh) = create_plugin_runtime_with_output(
1732 "PARAM_TEST",
1733 ParamTracker,
1734 pool,
1735 10,
1736 output,
1737 "",
1738 test_wiring(),
1739 );
1740 enable_callbacks(&handle);
1741
1742 handle
1744 .port_runtime()
1745 .port_handle()
1746 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1747 .unwrap();
1748 std::thread::sleep(std::time::Duration::from_millis(50));
1749
1750 handle.array_sender().send(make_test_array(1));
1752 let received = downstream_rx.blocking_recv().unwrap();
1753 assert_eq!(received.unique_id, 1);
1754
1755 handle
1757 .port_runtime()
1758 .port_handle()
1759 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1760 .unwrap();
1761 std::thread::sleep(std::time::Duration::from_millis(50));
1762
1763 handle.array_sender().send(make_test_array(2));
1765 let received = downstream_rx.blocking_recv().unwrap();
1766 assert_eq!(received.unique_id, 2);
1767 }
1768
1769 #[test]
1771 fn test_no_panic_in_current_thread_runtime() {
1772 let pool = Arc::new(NDArrayPool::new(1_000_000));
1773 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1774 let mut output = NDArrayOutput::new();
1775 output.add(downstream_sender);
1776
1777 let (handle, _data_jh) = create_plugin_runtime_with_output(
1778 "CURRENT_THREAD_TEST",
1779 PassthroughProcessor,
1780 pool,
1781 10,
1782 output,
1783 "",
1784 test_wiring(),
1785 );
1786 enable_callbacks(&handle);
1787
1788 handle
1790 .port_runtime()
1791 .port_handle()
1792 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1793 .unwrap();
1794 std::thread::sleep(std::time::Duration::from_millis(50));
1795
1796 let rt = tokio::runtime::Builder::new_current_thread()
1798 .enable_all()
1799 .build()
1800 .unwrap();
1801 rt.block_on(async {
1802 handle.array_sender().send(make_test_array(99));
1803 });
1804
1805 let received = downstream_rx.blocking_recv().unwrap();
1806 assert_eq!(received.unique_id, 99);
1807 }
1808
1809 #[test]
1810 fn test_sort_buffer_reorders_by_unique_id() {
1811 let mut buf = SortBuffer::new();
1812
1813 buf.insert(3, vec![make_test_array(3)], 10);
1815 buf.insert(1, vec![make_test_array(1)], 10);
1816 buf.insert(2, vec![make_test_array(2)], 10);
1817
1818 assert_eq!(buf.len(), 3);
1819
1820 let drained = buf.drain_all();
1821 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1822 assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1823 assert_eq!(buf.len(), 0);
1824 assert_eq!(buf.last_emitted_id, 3);
1825 }
1826
1827 #[test]
1828 fn test_sort_buffer_detects_disordered() {
1829 let mut buf = SortBuffer::new();
1830
1831 buf.insert(5, vec![make_test_array(5)], 10);
1833 buf.drain_all(); buf.insert(3, vec![make_test_array(3)], 10);
1836 assert_eq!(buf.disordered_arrays, 1);
1837 }
1838
1839 #[test]
1840 fn test_sort_buffer_drops_when_full() {
1841 let mut buf = SortBuffer::new();
1842
1843 buf.insert(1, vec![make_test_array(1)], 2);
1845 buf.insert(2, vec![make_test_array(2)], 2);
1846 buf.insert(3, vec![make_test_array(3)], 2);
1847
1848 assert_eq!(buf.len(), 2);
1850 assert_eq!(buf.dropped_output_arrays, 1);
1851
1852 let drained = buf.drain_all();
1853 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1854 assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1855 }
1856
1857 #[test]
1858 fn test_sort_mode_runtime_integration() {
1859 let pool = Arc::new(NDArrayPool::new(1_000_000));
1860 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1861 let mut output = NDArrayOutput::new();
1862 output.add(downstream_sender);
1863
1864 let (handle, _data_jh) = create_plugin_runtime_with_output(
1865 "SORT_TEST",
1866 PassthroughProcessor,
1867 pool,
1868 10,
1869 output,
1870 "",
1871 test_wiring(),
1872 );
1873 enable_callbacks(&handle);
1874
1875 handle
1877 .port_runtime()
1878 .port_handle()
1879 .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1880 .unwrap();
1881 handle
1882 .port_runtime()
1883 .port_handle()
1884 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1885 .unwrap();
1886 std::thread::sleep(std::time::Duration::from_millis(50));
1887
1888 handle.array_sender().send(make_test_array(3));
1890 handle.array_sender().send(make_test_array(1));
1891 handle.array_sender().send(make_test_array(2));
1892 std::thread::sleep(std::time::Duration::from_millis(100));
1893
1894 let rt = tokio::runtime::Builder::new_current_thread()
1896 .enable_all()
1897 .build()
1898 .unwrap();
1899 let result = rt.block_on(async {
1900 tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1901 });
1902 assert!(
1903 result.is_err(),
1904 "arrays should be buffered while sort mode is active"
1905 );
1906
1907 handle
1909 .port_runtime()
1910 .port_handle()
1911 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1912 .unwrap();
1913 std::thread::sleep(std::time::Duration::from_millis(100));
1914
1915 let r1 = downstream_rx.blocking_recv().unwrap();
1917 let r2 = downstream_rx.blocking_recv().unwrap();
1918 let r3 = downstream_rx.blocking_recv().unwrap();
1919 assert_eq!(r1.unique_id, 1);
1920 assert_eq!(r2.unique_id, 2);
1921 assert_eq!(r3.unique_id, 3);
1922 }
1923}