1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6use asyn_rs::error::AsynResult;
7use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use asyn_rs::user::AsynUser;
11
12use asyn_rs::port_handle::PortHandle;
13
14use crate::ndarray::NDArray;
15use crate::ndarray_pool::NDArrayPool;
16use crate::params::ndarray_driver::NDArrayDriverParams;
17
18use super::channel::{
19 BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
20};
21use super::params::PluginBaseParams;
22use super::wiring::WiringRegistry;
23
24#[derive(Debug, Clone)]
26pub enum ParamChangeValue {
27 Int32(i32),
28 Float64(f64),
29 Octet(String),
30}
31
32impl ParamChangeValue {
33 pub fn as_i32(&self) -> i32 {
34 match self {
35 ParamChangeValue::Int32(v) => *v,
36 ParamChangeValue::Float64(v) => *v as i32,
37 ParamChangeValue::Octet(_) => 0,
38 }
39 }
40
41 pub fn as_f64(&self) -> f64 {
42 match self {
43 ParamChangeValue::Int32(v) => *v as f64,
44 ParamChangeValue::Float64(v) => *v,
45 ParamChangeValue::Octet(_) => 0.0,
46 }
47 }
48
49 pub fn as_string(&self) -> Option<&str> {
50 match self {
51 ParamChangeValue::Octet(s) => Some(s),
52 _ => None,
53 }
54 }
55}
56
57pub enum ParamUpdate {
59 Int32 {
60 reason: usize,
61 addr: i32,
62 value: i32,
63 },
64 Float64 {
65 reason: usize,
66 addr: i32,
67 value: f64,
68 },
69 Octet {
70 reason: usize,
71 addr: i32,
72 value: String,
73 },
74 Float64Array {
75 reason: usize,
76 addr: i32,
77 value: Vec<f64>,
78 },
79}
80
81impl ParamUpdate {
82 pub fn int32(reason: usize, value: i32) -> Self {
84 Self::Int32 {
85 reason,
86 addr: 0,
87 value,
88 }
89 }
90 pub fn float64(reason: usize, value: f64) -> Self {
92 Self::Float64 {
93 reason,
94 addr: 0,
95 value,
96 }
97 }
98 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
100 Self::Int32 {
101 reason,
102 addr,
103 value,
104 }
105 }
106 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
108 Self::Float64 {
109 reason,
110 addr,
111 value,
112 }
113 }
114 pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
116 Self::Float64Array {
117 reason,
118 addr: 0,
119 value,
120 }
121 }
122 pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
124 Self::Float64Array {
125 reason,
126 addr,
127 value,
128 }
129 }
130}
131
132pub struct ProcessResult {
134 pub output_arrays: Vec<Arc<NDArray>>,
135 pub param_updates: Vec<ParamUpdate>,
136 pub scatter_index: Option<usize>,
138}
139
140impl ProcessResult {
141 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
143 Self {
144 output_arrays: vec![],
145 param_updates,
146 scatter_index: None,
147 }
148 }
149
150 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
152 Self {
153 output_arrays,
154 param_updates: vec![],
155 scatter_index: None,
156 }
157 }
158
159 pub fn empty() -> Self {
161 Self {
162 output_arrays: vec![],
163 param_updates: vec![],
164 scatter_index: None,
165 }
166 }
167
168 pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
170 Self {
171 output_arrays,
172 param_updates: vec![],
173 scatter_index: Some(index),
174 }
175 }
176}
177
178pub struct ParamChangeResult {
180 pub output_arrays: Vec<Arc<NDArray>>,
181 pub param_updates: Vec<ParamUpdate>,
182}
183
184impl ParamChangeResult {
185 pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
186 Self {
187 output_arrays: vec![],
188 param_updates,
189 }
190 }
191
192 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
193 Self {
194 output_arrays,
195 param_updates: vec![],
196 }
197 }
198
199 pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
200 Self {
201 output_arrays,
202 param_updates,
203 }
204 }
205
206 pub fn empty() -> Self {
207 Self {
208 output_arrays: vec![],
209 param_updates: vec![],
210 }
211 }
212}
213
214pub trait NDPluginProcess: Send + 'static {
216 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
218
219 fn plugin_type(&self) -> &str;
221
222 fn register_params(
224 &mut self,
225 _base: &mut PortDriverBase,
226 ) -> Result<(), asyn_rs::error::AsynError> {
227 Ok(())
228 }
229
230 fn on_param_change(
233 &mut self,
234 _reason: usize,
235 _params: &PluginParamSnapshot,
236 ) -> ParamChangeResult {
237 ParamChangeResult::empty()
238 }
239
240 fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
244 None
245 }
246}
247
248pub struct PluginParamSnapshot {
250 pub enable_callbacks: bool,
251 pub reason: usize,
253 pub addr: i32,
255 pub value: ParamChangeValue,
257}
258
259struct SortBuffer {
265 entries: BTreeMap<i32, Vec<Arc<NDArray>>>,
267 last_emitted_id: i32,
269 disordered_arrays: i32,
271 dropped_output_arrays: i32,
273}
274
275impl SortBuffer {
276 fn new() -> Self {
277 Self {
278 entries: BTreeMap::new(),
279 last_emitted_id: 0,
280 disordered_arrays: 0,
281 dropped_output_arrays: 0,
282 }
283 }
284
285 fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) {
287 if unique_id < self.last_emitted_id {
288 self.disordered_arrays += 1;
289 }
290 self.entries.entry(unique_id).or_default().extend(arrays);
291
292 while sort_size > 0 && self.entries.len() as i32 > sort_size {
294 if let Some((&oldest_key, _)) = self.entries.iter().next() {
295 self.entries.remove(&oldest_key);
296 self.dropped_output_arrays += 1;
297 }
298 }
299 }
300
301 fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
303 let entries: Vec<_> = std::mem::take(&mut self.entries).into_iter().collect();
304 if let Some(&(last_id, _)) = entries.last() {
305 self.last_emitted_id = last_id;
306 }
307 entries
308 }
309
310 fn len(&self) -> i32 {
312 self.entries.len() as i32
313 }
314}
315
316struct SharedProcessorInner<P: NDPluginProcess> {
319 processor: P,
320 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
321 pool: Arc<NDArrayPool>,
322 ndarray_params: NDArrayDriverParams,
323 plugin_params: PluginBaseParams,
324 port_handle: PortHandle,
325 array_counter: i32,
326 std_array_data_param: Option<usize>,
328 min_callback_time: f64,
330 last_process_time: Option<std::time::Instant>,
332 sort_mode: i32,
334 sort_time: f64,
336 sort_size: i32,
338 sort_buffer: SortBuffer,
340 rate_last_counter: i32,
342 rate_last_time: std::time::Instant,
344}
345
346impl<P: NDPluginProcess> SharedProcessorInner<P> {
347 fn should_throttle(&self) -> bool {
348 if self.min_callback_time <= 0.0 {
349 return false;
350 }
351 if let Some(last) = self.last_process_time {
352 last.elapsed().as_secs_f64() < self.min_callback_time
353 } else {
354 false
355 }
356 }
357
358 fn process_and_publish(&mut self, array: &NDArray) {
359 if self.should_throttle() {
360 return;
361 }
362 let t0 = std::time::Instant::now();
363 let result = self.processor.process_array(array, &self.pool);
364 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
365 self.last_process_time = Some(t0);
366
367 if self.sort_mode != 0 && !result.output_arrays.is_empty() {
368 let unique_id = array.unique_id;
370 self.sort_buffer
371 .insert(unique_id, result.output_arrays, self.sort_size);
372 self.update_sort_params();
374 if !result.param_updates.is_empty() {
376 self.publish_result(
377 vec![],
378 result.param_updates,
379 result.scatter_index,
380 Some(array),
381 elapsed_ms,
382 );
383 }
384 } else {
385 self.publish_result(
386 result.output_arrays,
387 result.param_updates,
388 result.scatter_index,
389 Some(array),
390 elapsed_ms,
391 );
392 }
393 }
394
395 fn flush_sort_buffer(&mut self) {
397 let entries = self.sort_buffer.drain_all();
398 for (_unique_id, arrays) in entries {
399 self.publish_result(arrays, vec![], None, None, 0.0);
400 }
401 self.update_sort_params();
402 }
403
404 fn update_sort_params(&self) {
406 let sort_free = self.sort_size - self.sort_buffer.len();
407 self.port_handle
408 .write_int32_no_wait(self.plugin_params.sort_free, 0, sort_free);
409 self.port_handle.write_int32_no_wait(
410 self.plugin_params.disordered_arrays,
411 0,
412 self.sort_buffer.disordered_arrays,
413 );
414 self.port_handle.write_int32_no_wait(
415 self.plugin_params.dropped_output_arrays,
416 0,
417 self.sort_buffer.dropped_output_arrays,
418 );
419 }
420
421 fn publish_result(
422 &mut self,
423 output_arrays: Vec<Arc<NDArray>>,
424 param_updates: Vec<ParamUpdate>,
425 scatter_index: Option<usize>,
426 fallback_array: Option<&NDArray>,
427 elapsed_ms: f64,
428 ) {
429 let output = self.output.lock();
430 for out in &output_arrays {
431 if let Some(idx) = scatter_index {
432 output.publish_to(idx, out.clone());
433 } else {
434 output.publish(out.clone());
435 }
436 }
437 drop(output);
438
439 if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
440 self.array_counter += 1;
441
442 if let Some(param) = self.std_array_data_param {
445 use crate::ndarray::NDDataBuffer;
446 use asyn_rs::param::ParamValue;
447 let value = match &report_arr.data {
448 NDDataBuffer::I8(v) => {
449 Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
450 }
451 NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
452 v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
453 ))),
454 NDDataBuffer::I16(v) => {
455 Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
456 }
457 NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
458 v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
459 ))),
460 NDDataBuffer::I32(v) => {
461 Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
462 }
463 NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
464 v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
465 ))),
466 NDDataBuffer::I64(v) => {
467 Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
468 }
469 NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
470 v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
471 ))),
472 NDDataBuffer::F32(v) => {
473 Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
474 }
475 NDDataBuffer::F64(v) => {
476 Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
477 }
478 };
479 if let Some(value) = value {
480 let ts = report_arr.timestamp.to_system_time();
481 self.port_handle
482 .interrupts()
483 .notify(asyn_rs::interrupt::InterruptValue {
484 reason: param,
485 addr: 0,
486 value,
487 timestamp: ts,
488 uint32_changed_mask: 0,
489 });
490 }
491 }
492
493 let info = report_arr.info();
494 let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
495 self.port_handle.write_int32_no_wait(
496 self.ndarray_params.array_counter,
497 0,
498 self.array_counter,
499 );
500 self.port_handle.write_int32_no_wait(
501 self.ndarray_params.unique_id,
502 0,
503 report_arr.unique_id,
504 );
505 self.port_handle.write_int32_no_wait(
506 self.ndarray_params.n_dimensions,
507 0,
508 report_arr.dims.len() as i32,
509 );
510 self.port_handle.write_int32_no_wait(
511 self.ndarray_params.array_size_x,
512 0,
513 info.x_size as i32,
514 );
515 self.port_handle.write_int32_no_wait(
516 self.ndarray_params.array_size_y,
517 0,
518 info.y_size as i32,
519 );
520 self.port_handle.write_int32_no_wait(
521 self.ndarray_params.array_size_z,
522 0,
523 info.color_size as i32,
524 );
525 self.port_handle.write_int32_no_wait(
526 self.ndarray_params.array_size,
527 0,
528 info.total_bytes as i32,
529 );
530 self.port_handle.write_int32_no_wait(
531 self.ndarray_params.data_type,
532 0,
533 report_arr.data.data_type() as i32,
534 );
535 self.port_handle
536 .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
537
538 let ts_f64 = report_arr.timestamp.as_f64();
539 self.port_handle
540 .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
541 self.port_handle.write_int32_no_wait(
542 self.ndarray_params.epics_ts_sec,
543 0,
544 report_arr.timestamp.sec as i32,
545 );
546 self.port_handle.write_int32_no_wait(
547 self.ndarray_params.epics_ts_nsec,
548 0,
549 report_arr.timestamp.nsec as i32,
550 );
551 }
552
553 self.port_handle
554 .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
555
556 let now = std::time::Instant::now();
558 let dt = now.duration_since(self.rate_last_time).as_secs_f64();
559 if dt >= 1.0 {
560 let delta = self.array_counter - self.rate_last_counter;
561 let rate = if delta > 0 { delta as f64 / dt } else { 0.0 };
562 self.rate_last_counter = self.array_counter;
563 self.rate_last_time = now;
564 self.port_handle
565 .write_float64_no_wait(self.ndarray_params.array_rate, 0, rate);
566 }
567
568 use asyn_rs::request::ParamSetValue;
571
572 let mut addr0_updates: Vec<ParamSetValue> = Vec::new();
573 let mut extra_addr_map: std::collections::HashMap<i32, Vec<ParamSetValue>> =
574 std::collections::HashMap::new();
575
576 for update in ¶m_updates {
577 match update {
578 ParamUpdate::Int32 {
579 reason,
580 addr,
581 value,
582 } => {
583 let pv = ParamSetValue::Int32 {
584 reason: *reason,
585 addr: *addr,
586 value: *value,
587 };
588 if *addr == 0 {
589 addr0_updates.push(pv);
590 } else {
591 extra_addr_map.entry(*addr).or_default().push(pv);
592 }
593 }
594 ParamUpdate::Float64 {
595 reason,
596 addr,
597 value,
598 } => {
599 let pv = ParamSetValue::Float64 {
600 reason: *reason,
601 addr: *addr,
602 value: *value,
603 };
604 if *addr == 0 {
605 addr0_updates.push(pv);
606 } else {
607 extra_addr_map.entry(*addr).or_default().push(pv);
608 }
609 }
610 ParamUpdate::Octet {
611 reason,
612 addr,
613 value,
614 } => {
615 let pv = ParamSetValue::Octet {
616 reason: *reason,
617 addr: *addr,
618 value: value.clone(),
619 };
620 if *addr == 0 {
621 addr0_updates.push(pv);
622 } else {
623 extra_addr_map.entry(*addr).or_default().push(pv);
624 }
625 }
626 ParamUpdate::Float64Array {
627 reason,
628 addr,
629 value,
630 } => {
631 let pv = ParamSetValue::Float64Array {
632 reason: *reason,
633 addr: *addr,
634 value: value.clone(),
635 };
636 if *addr == 0 {
637 addr0_updates.push(pv);
638 } else {
639 extra_addr_map.entry(*addr).or_default().push(pv);
640 }
641 }
642 }
643 }
644
645 self.port_handle.set_params_and_notify(0, addr0_updates);
646 for (addr, updates) in extra_addr_map {
647 self.port_handle.set_params_and_notify(addr, updates);
648 }
649 }
650}
651
652struct BlockingProcessorHandle<P: NDPluginProcess> {
655 inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
656}
657
658impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
659 fn process_and_publish(&self, array: &NDArray) {
660 self.inner.lock().process_and_publish(array);
661 }
662}
663
664#[allow(dead_code)]
666pub struct PluginPortDriver {
667 base: PortDriverBase,
668 ndarray_params: NDArrayDriverParams,
669 plugin_params: PluginBaseParams,
670 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
671 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
673 std_array_data_param: Option<usize>,
675}
676
677impl PluginPortDriver {
678 fn new<P: NDPluginProcess>(
679 port_name: &str,
680 plugin_type_name: &str,
681 queue_size: usize,
682 ndarray_port: &str,
683 max_addr: usize,
684 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
685 processor: &mut P,
686 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
687 ) -> AsynResult<Self> {
688 let mut base = PortDriverBase::new(
689 port_name,
690 max_addr,
691 PortFlags {
692 can_block: true,
693 ..Default::default()
694 },
695 );
696
697 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
698 let plugin_params = PluginBaseParams::create(&mut base)?;
699
700 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
702 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
703 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
704 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
705 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
706 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
707 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
708 base.set_int32_param(ndarray_params.write_file, 0, 0)?;
709 base.set_int32_param(ndarray_params.read_file, 0, 0)?;
710 base.set_int32_param(ndarray_params.capture, 0, 0)?;
711 base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
712 base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
713 base.set_string_param(ndarray_params.file_path, 0, "".into())?;
714 base.set_string_param(ndarray_params.file_name, 0, "".into())?;
715 base.set_int32_param(ndarray_params.file_number, 0, 0)?;
716 base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
717 base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
718 base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
719 base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
720 base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
721
722 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
724 base.set_string_param(
725 ndarray_params.ad_core_version,
726 0,
727 env!("CARGO_PKG_VERSION").into(),
728 )?;
729 base.set_string_param(
730 ndarray_params.driver_version,
731 0,
732 env!("CARGO_PKG_VERSION").into(),
733 )?;
734 if !ndarray_port.is_empty() {
735 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
736 }
737
738 let std_array_data_param = if array_data.is_some() {
740 Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
741 } else {
742 None
743 };
744
745 processor.register_params(&mut base)?;
747
748 Ok(Self {
749 base,
750 ndarray_params,
751 plugin_params,
752 param_change_tx,
753 array_data,
754 std_array_data_param,
755 })
756 }
757}
758
759fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
761 let n = src.len().min(dst.len());
762 dst[..n].copy_from_slice(&src[..n]);
763 n
764}
765
766fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
768where
769 S: CastToF64 + Copy,
770 D: CastFromF64 + Copy,
771{
772 let n = src.len().min(dst.len());
773 for i in 0..n {
774 dst[i] = D::cast_from_f64(src[i].cast_to_f64());
775 }
776 n
777}
778
779trait CastToF64 {
781 fn cast_to_f64(self) -> f64;
782}
783
784impl CastToF64 for i8 {
785 fn cast_to_f64(self) -> f64 {
786 self as f64
787 }
788}
789impl CastToF64 for u8 {
790 fn cast_to_f64(self) -> f64 {
791 self as f64
792 }
793}
794impl CastToF64 for i16 {
795 fn cast_to_f64(self) -> f64 {
796 self as f64
797 }
798}
799impl CastToF64 for u16 {
800 fn cast_to_f64(self) -> f64 {
801 self as f64
802 }
803}
804impl CastToF64 for i32 {
805 fn cast_to_f64(self) -> f64 {
806 self as f64
807 }
808}
809impl CastToF64 for u32 {
810 fn cast_to_f64(self) -> f64 {
811 self as f64
812 }
813}
814impl CastToF64 for i64 {
815 fn cast_to_f64(self) -> f64 {
816 self as f64
817 }
818}
819impl CastToF64 for u64 {
820 fn cast_to_f64(self) -> f64 {
821 self as f64
822 }
823}
824impl CastToF64 for f32 {
825 fn cast_to_f64(self) -> f64 {
826 self as f64
827 }
828}
829impl CastToF64 for f64 {
830 fn cast_to_f64(self) -> f64 {
831 self
832 }
833}
834
835trait CastFromF64 {
837 fn cast_from_f64(v: f64) -> Self;
838}
839
840impl CastFromF64 for i8 {
841 fn cast_from_f64(v: f64) -> Self {
842 v as i8
843 }
844}
845impl CastFromF64 for i16 {
846 fn cast_from_f64(v: f64) -> Self {
847 v as i16
848 }
849}
850impl CastFromF64 for i32 {
851 fn cast_from_f64(v: f64) -> Self {
852 v as i32
853 }
854}
855impl CastFromF64 for f32 {
856 fn cast_from_f64(v: f64) -> Self {
857 v as f32
858 }
859}
860impl CastFromF64 for f64 {
861 fn cast_from_f64(v: f64) -> Self {
862 v
863 }
864}
865
866macro_rules! impl_read_array {
869 ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
870 use crate::ndarray::NDDataBuffer;
871 let handle = match &$self.array_data {
872 Some(h) => h,
873 None => return Ok(0),
874 };
875 let guard = handle.lock();
876 let array = match &*guard {
877 Some(a) => a,
878 None => return Ok(0),
879 };
880 let n = match &array.data {
881 NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
882 $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
883 };
884 Ok(n)
885 }};
886}
887
888impl PortDriver for PluginPortDriver {
889 fn base(&self) -> &PortDriverBase {
890 &self.base
891 }
892
893 fn base_mut(&mut self) -> &mut PortDriverBase {
894 &mut self.base
895 }
896
897 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
898 let reason = user.reason;
899 let addr = user.addr;
900 self.base.set_int32_param(reason, addr, value)?;
901 self.base.call_param_callbacks(addr)?;
902 let _ = self
903 .param_change_tx
904 .try_send((reason, addr, ParamChangeValue::Int32(value)));
905 Ok(())
906 }
907
908 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
909 let reason = user.reason;
910 let addr = user.addr;
911 self.base.set_float64_param(reason, addr, value)?;
912 self.base.call_param_callbacks(addr)?;
913 let _ = self
914 .param_change_tx
915 .try_send((reason, addr, ParamChangeValue::Float64(value)));
916 Ok(())
917 }
918
919 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
920 let reason = user.reason;
921 let addr = user.addr;
922 let s = String::from_utf8_lossy(data).into_owned();
923 self.base.set_string_param(reason, addr, s.clone())?;
924 self.base.call_param_callbacks(addr)?;
925 let _ = self
926 .param_change_tx
927 .try_send((reason, addr, ParamChangeValue::Octet(s)));
928 Ok(())
929 }
930
931 fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
932 impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
933 }
934
935 fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
936 impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
937 }
938
939 fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
940 impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
941 }
942
943 fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
944 impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
945 }
946
947 fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
948 impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
949 }
950}
951
952#[derive(Clone)]
954pub struct PluginRuntimeHandle {
955 port_runtime: PortRuntimeHandle,
956 array_sender: NDArraySender,
957 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
958 port_name: String,
959 pub ndarray_params: NDArrayDriverParams,
960 pub plugin_params: PluginBaseParams,
961}
962
963impl PluginRuntimeHandle {
964 pub fn port_runtime(&self) -> &PortRuntimeHandle {
965 &self.port_runtime
966 }
967
968 pub fn array_sender(&self) -> &NDArraySender {
969 &self.array_sender
970 }
971
972 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
973 &self.array_output
974 }
975
976 pub fn port_name(&self) -> &str {
977 &self.port_name
978 }
979}
980
981pub fn create_plugin_runtime<P: NDPluginProcess>(
988 port_name: &str,
989 processor: P,
990 pool: Arc<NDArrayPool>,
991 queue_size: usize,
992 ndarray_port: &str,
993 wiring: Arc<WiringRegistry>,
994) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
995 create_plugin_runtime_multi_addr(
996 port_name,
997 processor,
998 pool,
999 queue_size,
1000 ndarray_port,
1001 wiring,
1002 1,
1003 )
1004}
1005
1006pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1010 port_name: &str,
1011 mut processor: P,
1012 pool: Arc<NDArrayPool>,
1013 queue_size: usize,
1014 ndarray_port: &str,
1015 wiring: Arc<WiringRegistry>,
1016 max_addr: usize,
1017) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1018 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1020
1021 let plugin_type_name = processor.plugin_type().to_string();
1023 let array_data = processor.array_data_handle();
1024
1025 let driver = PluginPortDriver::new(
1027 port_name,
1028 &plugin_type_name,
1029 queue_size,
1030 ndarray_port,
1031 max_addr,
1032 param_tx,
1033 &mut processor,
1034 array_data,
1035 )
1036 .expect("failed to create plugin port driver");
1037
1038 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1039 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1040 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1041 let sort_mode_reason = driver.plugin_params.sort_mode;
1042 let sort_time_reason = driver.plugin_params.sort_time;
1043 let sort_size_reason = driver.plugin_params.sort_size;
1044 let ndarray_params = driver.ndarray_params;
1045 let plugin_params = driver.plugin_params;
1046 let std_array_data_param = driver.std_array_data_param;
1047
1048 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1050
1051 let port_handle = port_runtime.port_handle().clone();
1053
1054 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1056
1057 let enabled = Arc::new(AtomicBool::new(false));
1059 let blocking_mode = Arc::new(AtomicBool::new(false));
1060
1061 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1063 let array_output_for_handle = array_output.clone();
1064 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1065 processor,
1066 output: array_output,
1067 pool,
1068 ndarray_params,
1069 plugin_params,
1070 port_handle,
1071 array_counter: 0,
1072 std_array_data_param,
1073 min_callback_time: 0.0,
1074 last_process_time: None,
1075 sort_mode: 0,
1076 sort_time: 0.0,
1077 sort_size: 10,
1078 sort_buffer: SortBuffer::new(),
1079 rate_last_counter: 0,
1080 rate_last_time: std::time::Instant::now(),
1081 }));
1082
1083 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1085 inner: shared.clone(),
1086 });
1087
1088 let data_enabled = enabled.clone();
1089 let data_blocking = blocking_mode.clone();
1090
1091 let dropped_count = array_sender.dropped_count_shared();
1093 let queue_tx = array_sender.tx_clone();
1094
1095 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1096
1097 let nd_array_port_reason = plugin_params.nd_array_port;
1099 let sender_port_name = port_name.to_string();
1100 let initial_upstream = ndarray_port.to_string();
1101
1102 let data_jh = thread::Builder::new()
1104 .name(format!("plugin-data-{port_name}"))
1105 .spawn(move || {
1106 plugin_data_loop(
1107 shared,
1108 array_rx,
1109 param_rx,
1110 enable_callbacks_reason,
1111 blocking_callbacks_reason,
1112 min_callback_time_reason,
1113 sort_mode_reason,
1114 sort_time_reason,
1115 sort_size_reason,
1116 data_enabled,
1117 data_blocking,
1118 nd_array_port_reason,
1119 sender_port_name,
1120 initial_upstream,
1121 wiring,
1122 dropped_count,
1123 queue_tx,
1124 );
1125 })
1126 .expect("failed to spawn plugin data thread");
1127
1128 let handle = PluginRuntimeHandle {
1129 port_runtime,
1130 array_sender,
1131 array_output: array_output_for_handle,
1132 port_name: port_name.to_string(),
1133 ndarray_params,
1134 plugin_params,
1135 };
1136
1137 (handle, data_jh)
1138}
1139
1140fn plugin_data_loop<P: NDPluginProcess>(
1141 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1142 mut array_rx: NDArrayReceiver,
1143 mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
1144 enable_callbacks_reason: usize,
1145 blocking_callbacks_reason: usize,
1146 min_callback_time_reason: usize,
1147 sort_mode_reason: usize,
1148 sort_time_reason: usize,
1149 sort_size_reason: usize,
1150 enabled: Arc<AtomicBool>,
1151 blocking_mode: Arc<AtomicBool>,
1152 nd_array_port_reason: usize,
1153 sender_port_name: String,
1154 initial_upstream: String,
1155 wiring: Arc<WiringRegistry>,
1156 dropped_count: Arc<std::sync::atomic::AtomicU64>,
1157 queue_tx: tokio::sync::mpsc::Sender<super::channel::ArrayMessage>,
1158) {
1159 let mut current_upstream = initial_upstream;
1160 let rt = tokio::runtime::Builder::new_current_thread()
1161 .enable_all()
1162 .build()
1163 .unwrap();
1164 rt.block_on(async {
1165 let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1168 let mut sort_flush_active = false;
1169
1170 loop {
1171 tokio::select! {
1172 msg = array_rx.recv_msg() => {
1173 match msg {
1174 Some(msg) => {
1175 if !blocking_mode.load(Ordering::Acquire) {
1178 shared.lock().process_and_publish(&msg.array);
1179 }
1180 let guard = shared.lock();
1184 let queue_free = queue_tx.capacity() as i32;
1185 let dropped = dropped_count.load(Ordering::Relaxed) as i32;
1186 guard.port_handle.write_int32_no_wait(
1187 guard.plugin_params.queue_use, 0, queue_free,
1188 );
1189 guard.port_handle.write_int32_no_wait(
1190 guard.plugin_params.dropped_arrays, 0, dropped,
1191 );
1192 drop(guard);
1193 }
1194 None => break,
1195 }
1196 }
1197 param = param_rx.recv() => {
1198 match param {
1199 Some((reason, addr, value)) => {
1200 if reason == enable_callbacks_reason {
1201 enabled.store(value.as_i32() != 0, Ordering::Release);
1202 }
1203 if reason == blocking_callbacks_reason {
1204 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1205 }
1206 if reason == min_callback_time_reason {
1208 shared.lock().min_callback_time = value.as_f64();
1209 }
1210 if reason == sort_mode_reason {
1212 let mode = value.as_i32();
1213 let mut guard = shared.lock();
1214 guard.sort_mode = mode;
1215 if mode == 0 {
1216 guard.flush_sort_buffer();
1218 sort_flush_active = false;
1219 } else {
1220 sort_flush_active = guard.sort_time > 0.0;
1222 if sort_flush_active {
1223 let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1224 sort_flush_interval = tokio::time::interval(dur);
1225 }
1226 }
1227 drop(guard);
1228 }
1229 if reason == sort_time_reason {
1230 let t = value.as_f64();
1231 let mut guard = shared.lock();
1232 guard.sort_time = t;
1233 if guard.sort_mode != 0 && t > 0.0 {
1234 sort_flush_active = true;
1235 let dur = std::time::Duration::from_secs_f64(t);
1236 sort_flush_interval = tokio::time::interval(dur);
1237 } else {
1238 sort_flush_active = false;
1239 }
1240 drop(guard);
1241 }
1242 if reason == sort_size_reason {
1243 shared.lock().sort_size = value.as_i32();
1244 }
1245 if reason == nd_array_port_reason {
1247 if let Some(new_port) = value.as_string() {
1248 if new_port != current_upstream {
1249 let old = std::mem::replace(&mut current_upstream, new_port.to_string());
1250 if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
1251 eprintln!("NDArrayPort rewire failed: {e}");
1252 current_upstream = old;
1253 }
1254 }
1255 }
1256 }
1257 let snapshot = PluginParamSnapshot {
1258 enable_callbacks: enabled.load(Ordering::Acquire),
1259 reason,
1260 addr,
1261 value,
1262 };
1263 let mut guard = shared.lock();
1264 let t0 = std::time::Instant::now();
1265 let result = guard.processor.on_param_change(reason, &snapshot);
1266 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1267 if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1268 guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
1269 }
1270 drop(guard);
1271 }
1272 None => break,
1273 }
1274 }
1275 _ = sort_flush_interval.tick(), if sort_flush_active => {
1276 shared.lock().flush_sort_buffer();
1277 }
1278 }
1279 }
1280 });
1281}
1282
1283pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1285 upstream.array_output().lock().add(downstream_sender);
1286}
1287
1288pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1290 port_name: &str,
1291 mut processor: P,
1292 pool: Arc<NDArrayPool>,
1293 queue_size: usize,
1294 output: NDArrayOutput,
1295 ndarray_port: &str,
1296 wiring: Arc<WiringRegistry>,
1297) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1298 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1299
1300 let plugin_type_name = processor.plugin_type().to_string();
1301 let array_data = processor.array_data_handle();
1302 let driver = PluginPortDriver::new(
1303 port_name,
1304 &plugin_type_name,
1305 queue_size,
1306 ndarray_port,
1307 1,
1308 param_tx,
1309 &mut processor,
1310 array_data,
1311 )
1312 .expect("failed to create plugin port driver");
1313
1314 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1315 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1316 let min_callback_time_reason = driver.plugin_params.min_callback_time;
1317 let sort_mode_reason = driver.plugin_params.sort_mode;
1318 let sort_time_reason = driver.plugin_params.sort_time;
1319 let sort_size_reason = driver.plugin_params.sort_size;
1320 let ndarray_params = driver.ndarray_params;
1321 let plugin_params = driver.plugin_params;
1322 let std_array_data_param = driver.std_array_data_param;
1323
1324 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1325
1326 let port_handle = port_runtime.port_handle().clone();
1327
1328 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1329
1330 let enabled = Arc::new(AtomicBool::new(false));
1331 let blocking_mode = Arc::new(AtomicBool::new(false));
1332
1333 let array_output = Arc::new(parking_lot::Mutex::new(output));
1334 let array_output_for_handle = array_output.clone();
1335 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1336 processor,
1337 output: array_output,
1338 pool,
1339 ndarray_params,
1340 plugin_params,
1341 port_handle,
1342 array_counter: 0,
1343 std_array_data_param,
1344 min_callback_time: 0.0,
1345 last_process_time: None,
1346 sort_mode: 0,
1347 sort_time: 0.0,
1348 sort_size: 10,
1349 sort_buffer: SortBuffer::new(),
1350 rate_last_counter: 0,
1351 rate_last_time: std::time::Instant::now(),
1352 }));
1353
1354 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1355 inner: shared.clone(),
1356 });
1357
1358 let data_enabled = enabled.clone();
1359 let data_blocking = blocking_mode.clone();
1360
1361 let dropped_count = array_sender.dropped_count_shared();
1363 let queue_tx = array_sender.tx_clone();
1364
1365 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1366
1367 let nd_array_port_reason = plugin_params.nd_array_port;
1369 let sender_port_name = port_name.to_string();
1370 let initial_upstream = ndarray_port.to_string();
1371
1372 let data_jh = thread::Builder::new()
1373 .name(format!("plugin-data-{port_name}"))
1374 .spawn(move || {
1375 plugin_data_loop(
1376 shared,
1377 array_rx,
1378 param_rx,
1379 enable_callbacks_reason,
1380 blocking_callbacks_reason,
1381 min_callback_time_reason,
1382 sort_mode_reason,
1383 sort_time_reason,
1384 sort_size_reason,
1385 data_enabled,
1386 data_blocking,
1387 nd_array_port_reason,
1388 sender_port_name,
1389 initial_upstream,
1390 wiring,
1391 dropped_count,
1392 queue_tx,
1393 );
1394 })
1395 .expect("failed to spawn plugin data thread");
1396
1397 let handle = PluginRuntimeHandle {
1398 port_runtime,
1399 array_sender,
1400 array_output: array_output_for_handle,
1401 port_name: port_name.to_string(),
1402 ndarray_params,
1403 plugin_params,
1404 };
1405
1406 (handle, data_jh)
1407}
1408
1409#[cfg(test)]
1410mod tests {
1411 use super::*;
1412 use crate::ndarray::{NDDataType, NDDimension};
1413 use crate::plugin::channel::ndarray_channel;
1414
1415 struct PassthroughProcessor;
1417
1418 impl NDPluginProcess for PassthroughProcessor {
1419 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1420 ProcessResult::arrays(vec![Arc::new(array.clone())])
1421 }
1422 fn plugin_type(&self) -> &str {
1423 "Passthrough"
1424 }
1425 }
1426
1427 struct SinkProcessor {
1429 count: usize,
1430 }
1431
1432 impl NDPluginProcess for SinkProcessor {
1433 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1434 self.count += 1;
1435 ProcessResult::empty()
1436 }
1437 fn plugin_type(&self) -> &str {
1438 "Sink"
1439 }
1440 }
1441
1442 fn make_test_array(id: i32) -> Arc<NDArray> {
1443 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1444 arr.unique_id = id;
1445 Arc::new(arr)
1446 }
1447
1448 fn test_wiring() -> Arc<WiringRegistry> {
1449 Arc::new(WiringRegistry::new())
1450 }
1451
1452 fn enable_callbacks(handle: &PluginRuntimeHandle) {
1454 handle
1455 .port_runtime()
1456 .port_handle()
1457 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1458 .unwrap();
1459 std::thread::sleep(std::time::Duration::from_millis(10));
1460 }
1461
1462 #[test]
1463 fn test_passthrough_runtime() {
1464 let pool = Arc::new(NDArrayPool::new(1_000_000));
1465
1466 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1468 let mut output = NDArrayOutput::new();
1469 output.add(downstream_sender);
1470
1471 let (handle, _data_jh) = create_plugin_runtime_with_output(
1472 "PASS1",
1473 PassthroughProcessor,
1474 pool,
1475 10,
1476 output,
1477 "",
1478 test_wiring(),
1479 );
1480 enable_callbacks(&handle);
1481
1482 handle.array_sender().send(make_test_array(42));
1484
1485 let received = downstream_rx.blocking_recv().unwrap();
1487 assert_eq!(received.unique_id, 42);
1488 }
1489
1490 #[test]
1491 fn test_sink_runtime() {
1492 let pool = Arc::new(NDArrayPool::new(1_000_000));
1493
1494 let (handle, _data_jh) = create_plugin_runtime(
1495 "SINK1",
1496 SinkProcessor { count: 0 },
1497 pool,
1498 10,
1499 "",
1500 test_wiring(),
1501 );
1502 enable_callbacks(&handle);
1503
1504 handle.array_sender().send(make_test_array(1));
1506 handle.array_sender().send(make_test_array(2));
1507
1508 std::thread::sleep(std::time::Duration::from_millis(50));
1510
1511 assert_eq!(handle.port_name(), "SINK1");
1513 }
1514
1515 #[test]
1516 fn test_plugin_type_param() {
1517 let pool = Arc::new(NDArrayPool::new(1_000_000));
1518
1519 let (handle, _data_jh) = create_plugin_runtime(
1520 "TYPE_TEST",
1521 PassthroughProcessor,
1522 pool,
1523 10,
1524 "",
1525 test_wiring(),
1526 );
1527
1528 assert_eq!(handle.port_name(), "TYPE_TEST");
1530 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1531 }
1532
1533 #[test]
1534 fn test_shutdown_on_handle_drop() {
1535 let pool = Arc::new(NDArrayPool::new(1_000_000));
1536
1537 let (handle, data_jh) = create_plugin_runtime(
1538 "SHUTDOWN_TEST",
1539 PassthroughProcessor,
1540 pool,
1541 10,
1542 "",
1543 test_wiring(),
1544 );
1545
1546 let sender = handle.array_sender().clone();
1548 drop(handle);
1549 drop(sender);
1550
1551 let result = data_jh.join();
1553 assert!(result.is_ok());
1554 }
1555
1556 #[test]
1557 fn test_dropped_count_when_queue_full() {
1558 let pool = Arc::new(NDArrayPool::new(1_000_000));
1559
1560 struct SlowProcessor;
1562 impl NDPluginProcess for SlowProcessor {
1563 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1564 std::thread::sleep(std::time::Duration::from_millis(100));
1565 ProcessResult::empty()
1566 }
1567 fn plugin_type(&self) -> &str {
1568 "Slow"
1569 }
1570 }
1571
1572 let (handle, _data_jh) =
1573 create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1574 enable_callbacks(&handle);
1575
1576 for i in 0..10 {
1578 handle.array_sender().send(make_test_array(i));
1579 }
1580
1581 assert!(handle.array_sender().dropped_count() > 0);
1583 }
1584
1585 #[test]
1586 fn test_blocking_callbacks_basic() {
1587 let pool = Arc::new(NDArrayPool::new(1_000_000));
1588 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1589 let mut output = NDArrayOutput::new();
1590 output.add(downstream_sender);
1591
1592 let (handle, _data_jh) = create_plugin_runtime_with_output(
1593 "BLOCK_TEST",
1594 PassthroughProcessor,
1595 pool,
1596 10,
1597 output,
1598 "",
1599 test_wiring(),
1600 );
1601 enable_callbacks(&handle);
1602
1603 handle
1605 .port_runtime()
1606 .port_handle()
1607 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1608 .unwrap();
1609 std::thread::sleep(std::time::Duration::from_millis(50));
1610
1611 handle.array_sender().send(make_test_array(42));
1613
1614 let received = downstream_rx.blocking_recv().unwrap();
1616 assert_eq!(received.unique_id, 42);
1617 }
1618
1619 #[test]
1620 fn test_blocking_to_nonblocking_switch() {
1621 let pool = Arc::new(NDArrayPool::new(1_000_000));
1622 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1623 let mut output = NDArrayOutput::new();
1624 output.add(downstream_sender);
1625
1626 let (handle, _data_jh) = create_plugin_runtime_with_output(
1627 "SWITCH_TEST",
1628 PassthroughProcessor,
1629 pool,
1630 10,
1631 output,
1632 "",
1633 test_wiring(),
1634 );
1635 enable_callbacks(&handle);
1636
1637 handle
1639 .port_runtime()
1640 .port_handle()
1641 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1642 .unwrap();
1643 std::thread::sleep(std::time::Duration::from_millis(50));
1644
1645 handle.array_sender().send(make_test_array(1));
1646 let received = downstream_rx.blocking_recv().unwrap();
1647 assert_eq!(received.unique_id, 1);
1648
1649 handle
1651 .port_runtime()
1652 .port_handle()
1653 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1654 .unwrap();
1655 std::thread::sleep(std::time::Duration::from_millis(50));
1656
1657 handle.array_sender().send(make_test_array(2));
1659 let received = downstream_rx.blocking_recv().unwrap();
1660 assert_eq!(received.unique_id, 2);
1661 }
1662
1663 #[test]
1664 fn test_enable_callbacks_disables_processing() {
1665 let pool = Arc::new(NDArrayPool::new(1_000_000));
1666 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1667 let mut output = NDArrayOutput::new();
1668 output.add(downstream_sender);
1669
1670 let (handle, _data_jh) = create_plugin_runtime_with_output(
1671 "ENABLE_TEST",
1672 PassthroughProcessor,
1673 pool,
1674 10,
1675 output,
1676 "",
1677 test_wiring(),
1678 );
1679
1680 handle
1682 .port_runtime()
1683 .port_handle()
1684 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1685 .unwrap();
1686 std::thread::sleep(std::time::Duration::from_millis(50));
1687
1688 handle.array_sender().send(make_test_array(99));
1690
1691 let rt = tokio::runtime::Builder::new_current_thread()
1693 .enable_all()
1694 .build()
1695 .unwrap();
1696 let result = rt.block_on(async {
1697 tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1698 });
1699 assert!(
1700 result.is_err(),
1701 "should not receive array when callbacks disabled"
1702 );
1703 }
1704
1705 #[test]
1706 fn test_blocking_downstream_receives() {
1707 let pool = Arc::new(NDArrayPool::new(1_000_000));
1708
1709 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1710 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1711 let mut output = NDArrayOutput::new();
1712 output.add(ds1);
1713 output.add(ds2);
1714
1715 let (handle, _data_jh) = create_plugin_runtime_with_output(
1716 "BLOCK_DS_TEST",
1717 PassthroughProcessor,
1718 pool,
1719 10,
1720 output,
1721 "",
1722 test_wiring(),
1723 );
1724 enable_callbacks(&handle);
1725
1726 handle
1728 .port_runtime()
1729 .port_handle()
1730 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1731 .unwrap();
1732 std::thread::sleep(std::time::Duration::from_millis(50));
1733
1734 handle.array_sender().send(make_test_array(77));
1735
1736 let r1 = rx1.blocking_recv().unwrap();
1738 let r2 = rx2.blocking_recv().unwrap();
1739 assert_eq!(r1.unique_id, 77);
1740 assert_eq!(r2.unique_id, 77);
1741 }
1742
1743 #[test]
1744 fn test_blocking_param_updates() {
1745 let pool = Arc::new(NDArrayPool::new(1_000_000));
1746
1747 struct ParamTracker;
1748 impl NDPluginProcess for ParamTracker {
1749 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1750 ProcessResult::arrays(vec![Arc::new(array.clone())])
1751 }
1752 fn plugin_type(&self) -> &str {
1753 "ParamTracker"
1754 }
1755 }
1756
1757 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1758 let mut output = NDArrayOutput::new();
1759 output.add(downstream_sender);
1760
1761 let (handle, _data_jh) = create_plugin_runtime_with_output(
1762 "PARAM_TEST",
1763 ParamTracker,
1764 pool,
1765 10,
1766 output,
1767 "",
1768 test_wiring(),
1769 );
1770 enable_callbacks(&handle);
1771
1772 handle
1774 .port_runtime()
1775 .port_handle()
1776 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1777 .unwrap();
1778 std::thread::sleep(std::time::Duration::from_millis(50));
1779
1780 handle.array_sender().send(make_test_array(1));
1782 let received = downstream_rx.blocking_recv().unwrap();
1783 assert_eq!(received.unique_id, 1);
1784
1785 handle
1787 .port_runtime()
1788 .port_handle()
1789 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1790 .unwrap();
1791 std::thread::sleep(std::time::Duration::from_millis(50));
1792
1793 handle.array_sender().send(make_test_array(2));
1795 let received = downstream_rx.blocking_recv().unwrap();
1796 assert_eq!(received.unique_id, 2);
1797 }
1798
1799 #[test]
1801 fn test_no_panic_in_current_thread_runtime() {
1802 let pool = Arc::new(NDArrayPool::new(1_000_000));
1803 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1804 let mut output = NDArrayOutput::new();
1805 output.add(downstream_sender);
1806
1807 let (handle, _data_jh) = create_plugin_runtime_with_output(
1808 "CURRENT_THREAD_TEST",
1809 PassthroughProcessor,
1810 pool,
1811 10,
1812 output,
1813 "",
1814 test_wiring(),
1815 );
1816 enable_callbacks(&handle);
1817
1818 handle
1820 .port_runtime()
1821 .port_handle()
1822 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1823 .unwrap();
1824 std::thread::sleep(std::time::Duration::from_millis(50));
1825
1826 let rt = tokio::runtime::Builder::new_current_thread()
1828 .enable_all()
1829 .build()
1830 .unwrap();
1831 rt.block_on(async {
1832 handle.array_sender().send(make_test_array(99));
1833 });
1834
1835 let received = downstream_rx.blocking_recv().unwrap();
1836 assert_eq!(received.unique_id, 99);
1837 }
1838
1839 #[test]
1840 fn test_sort_buffer_reorders_by_unique_id() {
1841 let mut buf = SortBuffer::new();
1842
1843 buf.insert(3, vec![make_test_array(3)], 10);
1845 buf.insert(1, vec![make_test_array(1)], 10);
1846 buf.insert(2, vec![make_test_array(2)], 10);
1847
1848 assert_eq!(buf.len(), 3);
1849
1850 let drained = buf.drain_all();
1851 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1852 assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
1853 assert_eq!(buf.len(), 0);
1854 assert_eq!(buf.last_emitted_id, 3);
1855 }
1856
1857 #[test]
1858 fn test_sort_buffer_detects_disordered() {
1859 let mut buf = SortBuffer::new();
1860
1861 buf.insert(5, vec![make_test_array(5)], 10);
1863 buf.drain_all(); buf.insert(3, vec![make_test_array(3)], 10);
1866 assert_eq!(buf.disordered_arrays, 1);
1867 }
1868
1869 #[test]
1870 fn test_sort_buffer_drops_when_full() {
1871 let mut buf = SortBuffer::new();
1872
1873 buf.insert(1, vec![make_test_array(1)], 2);
1875 buf.insert(2, vec![make_test_array(2)], 2);
1876 buf.insert(3, vec![make_test_array(3)], 2);
1877
1878 assert_eq!(buf.len(), 2);
1880 assert_eq!(buf.dropped_output_arrays, 1);
1881
1882 let drained = buf.drain_all();
1883 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
1884 assert_eq!(ids, vec![2, 3], "oldest (id=1) should have been dropped");
1885 }
1886
1887 #[test]
1888 fn test_sort_mode_runtime_integration() {
1889 let pool = Arc::new(NDArrayPool::new(1_000_000));
1890 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1891 let mut output = NDArrayOutput::new();
1892 output.add(downstream_sender);
1893
1894 let (handle, _data_jh) = create_plugin_runtime_with_output(
1895 "SORT_TEST",
1896 PassthroughProcessor,
1897 pool,
1898 10,
1899 output,
1900 "",
1901 test_wiring(),
1902 );
1903 enable_callbacks(&handle);
1904
1905 handle
1907 .port_runtime()
1908 .port_handle()
1909 .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
1910 .unwrap();
1911 handle
1912 .port_runtime()
1913 .port_handle()
1914 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
1915 .unwrap();
1916 std::thread::sleep(std::time::Duration::from_millis(50));
1917
1918 handle.array_sender().send(make_test_array(3));
1920 handle.array_sender().send(make_test_array(1));
1921 handle.array_sender().send(make_test_array(2));
1922 std::thread::sleep(std::time::Duration::from_millis(100));
1923
1924 let rt = tokio::runtime::Builder::new_current_thread()
1926 .enable_all()
1927 .build()
1928 .unwrap();
1929 let result = rt.block_on(async {
1930 tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
1931 });
1932 assert!(
1933 result.is_err(),
1934 "arrays should be buffered while sort mode is active"
1935 );
1936
1937 handle
1939 .port_runtime()
1940 .port_handle()
1941 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 0)
1942 .unwrap();
1943 std::thread::sleep(std::time::Duration::from_millis(100));
1944
1945 let r1 = downstream_rx.blocking_recv().unwrap();
1947 let r2 = downstream_rx.blocking_recv().unwrap();
1948 let r3 = downstream_rx.blocking_recv().unwrap();
1949 assert_eq!(r1.unique_id, 1);
1950 assert_eq!(r2.unique_id, 2);
1951 assert_eq!(r3.unique_id, 3);
1952 }
1953}