1use std::collections::BTreeMap;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::thread;
20
21use asyn_rs::error::AsynResult;
22use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
23use asyn_rs::runtime::config::RuntimeConfig;
24use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
25use asyn_rs::user::AsynUser;
26
27use asyn_rs::port_handle::PortHandle;
28
29use crate::ndarray::NDArray;
30use crate::ndarray_pool::NDArrayPool;
31use crate::params::ndarray_driver::NDArrayDriverParams;
32
33use super::channel::{NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel};
34use super::params::PluginBaseParams;
35use super::wiring::{WiringRegistry, upstream_key};
36
37#[derive(Debug, Clone)]
39pub enum ParamChangeValue {
40 Int32(i32),
41 Float64(f64),
42 Octet(String),
43}
44
45impl ParamChangeValue {
46 pub fn as_i32(&self) -> i32 {
47 match self {
48 ParamChangeValue::Int32(v) => *v,
49 ParamChangeValue::Float64(v) => *v as i32,
50 ParamChangeValue::Octet(_) => 0,
51 }
52 }
53
54 pub fn as_f64(&self) -> f64 {
55 match self {
56 ParamChangeValue::Int32(v) => *v as f64,
57 ParamChangeValue::Float64(v) => *v,
58 ParamChangeValue::Octet(_) => 0.0,
59 }
60 }
61
62 pub fn as_string(&self) -> Option<&str> {
63 match self {
64 ParamChangeValue::Octet(s) => Some(s),
65 _ => None,
66 }
67 }
68}
69
70pub enum ParamUpdate {
72 Int32 {
73 reason: usize,
74 addr: i32,
75 value: i32,
76 },
77 Float64 {
78 reason: usize,
79 addr: i32,
80 value: f64,
81 },
82 Octet {
83 reason: usize,
84 addr: i32,
85 value: String,
86 },
87 Float64Array {
88 reason: usize,
89 addr: i32,
90 value: Vec<f64>,
91 },
92}
93
94impl ParamUpdate {
95 pub fn int32(reason: usize, value: i32) -> Self {
97 Self::Int32 {
98 reason,
99 addr: 0,
100 value,
101 }
102 }
103 pub fn float64(reason: usize, value: f64) -> Self {
105 Self::Float64 {
106 reason,
107 addr: 0,
108 value,
109 }
110 }
111 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
113 Self::Int32 {
114 reason,
115 addr,
116 value,
117 }
118 }
119 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
121 Self::Float64 {
122 reason,
123 addr,
124 value,
125 }
126 }
127 pub fn float64_array(reason: usize, value: Vec<f64>) -> Self {
129 Self::Float64Array {
130 reason,
131 addr: 0,
132 value,
133 }
134 }
135 pub fn float64_array_addr(reason: usize, addr: i32, value: Vec<f64>) -> Self {
137 Self::Float64Array {
138 reason,
139 addr,
140 value,
141 }
142 }
143 pub fn octet(reason: usize, value: String) -> Self {
145 Self::Octet {
146 reason,
147 addr: 0,
148 value,
149 }
150 }
151 pub fn octet_addr(reason: usize, addr: i32, value: String) -> Self {
153 Self::Octet {
154 reason,
155 addr,
156 value,
157 }
158 }
159}
160
161pub struct ProcessResult {
163 pub output_arrays: Vec<Arc<NDArray>>,
164 pub param_updates: Vec<ParamUpdate>,
165 pub scatter_index: Option<usize>,
167}
168
169impl ProcessResult {
170 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
172 Self {
173 output_arrays: vec![],
174 param_updates,
175 scatter_index: None,
176 }
177 }
178
179 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
181 Self {
182 output_arrays,
183 param_updates: vec![],
184 scatter_index: None,
185 }
186 }
187
188 pub fn empty() -> Self {
190 Self {
191 output_arrays: vec![],
192 param_updates: vec![],
193 scatter_index: None,
194 }
195 }
196
197 pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
199 Self {
200 output_arrays,
201 param_updates: vec![],
202 scatter_index: Some(index),
203 }
204 }
205}
206
207pub struct ParamChangeResult {
209 pub output_arrays: Vec<Arc<NDArray>>,
210 pub param_updates: Vec<ParamUpdate>,
211}
212
213impl ParamChangeResult {
214 pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
215 Self {
216 output_arrays: vec![],
217 param_updates,
218 }
219 }
220
221 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
222 Self {
223 output_arrays,
224 param_updates: vec![],
225 }
226 }
227
228 pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
229 Self {
230 output_arrays,
231 param_updates,
232 }
233 }
234
235 pub fn empty() -> Self {
236 Self {
237 output_arrays: vec![],
238 param_updates: vec![],
239 }
240 }
241}
242
243pub trait NDPluginProcess: Send + 'static {
245 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
247
248 fn plugin_type(&self) -> &str;
250
251 fn compression_aware(&self) -> bool {
257 false
258 }
259
260 fn register_params(
262 &mut self,
263 _base: &mut PortDriverBase,
264 ) -> Result<(), asyn_rs::error::AsynError> {
265 Ok(())
266 }
267
268 fn on_param_change(
271 &mut self,
272 _reason: usize,
273 _params: &PluginParamSnapshot,
274 ) -> ParamChangeResult {
275 ParamChangeResult::empty()
276 }
277
278 fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
282 None
283 }
284}
285
286pub struct PluginParamSnapshot {
288 pub enable_callbacks: bool,
289 pub reason: usize,
291 pub addr: i32,
293 pub value: ParamChangeValue,
295}
296
297struct SortEntry {
301 arrays: Vec<Arc<NDArray>>,
302 inserted: std::time::Instant,
303}
304
305struct SortBuffer {
313 entries: BTreeMap<i32, SortEntry>,
315 prev_unique_id: i32,
317 first_output: bool,
319 disordered_arrays: i32,
321 dropped_output_arrays: i32,
324}
325
326impl SortBuffer {
327 fn new() -> Self {
328 Self {
329 entries: BTreeMap::new(),
330 prev_unique_id: 0,
331 first_output: true,
332 disordered_arrays: 0,
333 dropped_output_arrays: 0,
334 }
335 }
336
337 fn order_ok(&self, unique_id: i32) -> bool {
339 unique_id == self.prev_unique_id || unique_id == self.prev_unique_id + 1
340 }
341
342 fn note_emitted(&mut self, unique_id: i32) {
345 if !self.first_output && !self.order_ok(unique_id) {
346 self.disordered_arrays += 1;
347 }
348 self.first_output = false;
349 self.prev_unique_id = unique_id;
350 }
351
352 fn insert(&mut self, unique_id: i32, arrays: Vec<Arc<NDArray>>, sort_size: i32) -> bool {
357 if sort_size > 0 && self.entries.len() as i32 >= sort_size {
358 self.dropped_output_arrays += 1;
359 return false;
360 }
361 self.entries
362 .entry(unique_id)
363 .or_insert_with(|| SortEntry {
364 arrays: Vec::new(),
365 inserted: std::time::Instant::now(),
366 })
367 .arrays
368 .extend(arrays);
369 true
370 }
371
372 fn drain_ready(&mut self, sort_time: f64) -> Vec<(i32, Vec<Arc<NDArray>>)> {
376 let now = std::time::Instant::now();
377 let mut out = Vec::new();
378 while let Some((&head_id, entry)) = self.entries.iter().next() {
379 let delta = now.duration_since(entry.inserted).as_secs_f64();
380 let order_ok = self.order_ok(head_id);
381 if (!self.first_output && order_ok) || delta > sort_time {
382 let entry = self.entries.remove(&head_id).unwrap();
383 self.note_emitted(head_id);
384 out.push((head_id, entry.arrays));
385 } else {
386 break;
387 }
388 }
389 out
390 }
391
392 fn drain_all(&mut self) -> Vec<(i32, Vec<Arc<NDArray>>)> {
395 let entries = std::mem::take(&mut self.entries);
396 let mut out = Vec::with_capacity(entries.len());
397 for (id, entry) in entries {
398 self.note_emitted(id);
399 out.push((id, entry.arrays));
400 }
401 out
402 }
403
404 fn len(&self) -> i32 {
406 self.entries.len() as i32
407 }
408}
409
410struct SharedProcessorInner<P: NDPluginProcess> {
413 processor: P,
414 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
415 pool: Arc<NDArrayPool>,
416 ndarray_params: NDArrayDriverParams,
417 plugin_params: PluginBaseParams,
418 port_handle: PortHandle,
419 array_counter: i32,
423 std_array_data_param: Option<usize>,
425 min_callback_time: f64,
427 last_process_time: Option<std::time::Instant>,
429 sort_mode: i32,
431 sort_time: f64,
433 sort_size: i32,
435 sort_buffer: SortBuffer,
437 dropped_arrays: Arc<std::sync::atomic::AtomicI32>,
441 compression_aware: bool,
444 max_byte_rate: f64,
446 throttler: super::throttler::Throttler,
448 prev_input_array: Option<Arc<NDArray>>,
451 dims_prev: Vec<i32>,
454 nd_array_addr: i32,
456 max_threads: i32,
458 num_threads: i32,
460}
461
462impl<P: NDPluginProcess> SharedProcessorInner<P> {
463 fn should_throttle(&self) -> bool {
464 if self.min_callback_time <= 0.0 {
465 return false;
466 }
467 if let Some(last) = self.last_process_time {
468 last.elapsed().as_secs_f64() < self.min_callback_time
469 } else {
470 false
471 }
472 }
473
474 fn array_byte_cost(array: &NDArray) -> f64 {
477 match &array.codec {
478 Some(c) => c.compressed_size as f64,
479 None => array.info().total_bytes as f64,
480 }
481 }
482
483 fn throttle_ok(&mut self, array: &NDArray) -> bool {
486 if self.max_byte_rate == 0.0 {
487 return true;
488 }
489 let cost = Self::array_byte_cost(array);
490 if self.throttler.try_take(cost) {
491 true
492 } else {
493 self.sort_buffer.dropped_output_arrays += 1;
494 false
495 }
496 }
497
498 fn route_output_arrays(&mut self, arrays: Vec<Arc<NDArray>>) -> Vec<Arc<NDArray>> {
506 let mut ready = Vec::new();
507 for arr in arrays {
508 if !self.throttle_ok(&arr) {
509 continue; }
511 let uid = arr.unique_id;
512 if self.sort_mode != 0
513 && !self.sort_buffer.first_output
514 && !self.sort_buffer.order_ok(uid)
515 {
516 self.sort_buffer.insert(uid, vec![arr], self.sort_size);
518 } else {
519 self.sort_buffer.note_emitted(uid);
521 ready.push(arr);
522 }
523 }
524 if self.sort_mode != 0 {
527 for (_id, mut bucket) in self.sort_buffer.drain_ready(self.sort_time) {
528 ready.append(&mut bucket);
529 }
530 }
531 ready
532 }
533
534 fn process_and_publish(&mut self, array: &Arc<NDArray>) -> Option<ProcessOutput> {
538 if self.should_throttle() {
540 self.dropped_arrays
541 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
542 return Some(self.dropped_arrays_only_batch());
543 }
544 self.prev_input_array = Some(Arc::clone(array));
549 let t0 = std::time::Instant::now();
550 let result = self.processor.process_array(array, &self.pool);
551 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
552 self.last_process_time = Some(t0);
553
554 let ready = self.route_output_arrays(result.output_arrays);
555 let mut output = self.build_publish_batch(
556 ready,
557 result.param_updates,
558 result.scatter_index,
559 Some(array.as_ref()),
560 elapsed_ms,
561 );
562 output.batch.merge(self.build_status_params_batch());
563 Some(output)
564 }
565
566 fn dropped_arrays_only_batch(&self) -> ProcessOutput {
569 ProcessOutput {
570 arrays: vec![],
571 scatter_index: None,
572 batch: self.build_status_params_batch(),
573 }
574 }
575
576 fn process_plugin(&mut self) -> Option<ProcessOutput> {
579 let prev = self.prev_input_array.clone()?;
580 self.process_and_publish(&prev)
581 }
582
583 fn tick_sort_buffer(&mut self) -> ProcessOutput {
586 let entries = self.sort_buffer.drain_ready(self.sort_time);
587 self.emit_drained(entries)
588 }
589
590 fn flush_sort_buffer(&mut self) -> ProcessOutput {
592 let entries = self.sort_buffer.drain_all();
593 self.emit_drained(entries)
594 }
595
596 fn emit_drained(&mut self, entries: Vec<(i32, Vec<Arc<NDArray>>)>) -> ProcessOutput {
597 let mut all_arrays = Vec::new();
598 let mut combined = ParamBatch::empty();
599 for (_unique_id, arrays) in entries {
600 let output = self.build_publish_batch(arrays, vec![], None, None, 0.0);
601 all_arrays.extend(output.arrays);
602 combined.merge(output.batch);
603 }
604 combined.merge(self.build_sort_params_batch());
605 ProcessOutput {
606 arrays: all_arrays,
607 scatter_index: None,
608 batch: combined,
609 }
610 }
611
612 fn build_sort_params_batch(&self) -> ParamBatch {
613 use asyn_rs::request::ParamSetValue;
614 let sort_free = self.sort_size - self.sort_buffer.len();
615 ParamBatch {
616 addr0: vec![
617 ParamSetValue::Int32 {
618 reason: self.plugin_params.sort_free,
619 addr: 0,
620 value: sort_free,
621 },
622 ParamSetValue::Int32 {
623 reason: self.plugin_params.disordered_arrays,
624 addr: 0,
625 value: self.sort_buffer.disordered_arrays,
626 },
627 ParamSetValue::Int32 {
628 reason: self.plugin_params.dropped_output_arrays,
629 addr: 0,
630 value: self.sort_buffer.dropped_output_arrays,
631 },
632 ],
633 extra: std::collections::HashMap::new(),
634 }
635 }
636
637 fn build_status_params_batch(&self) -> ParamBatch {
640 use asyn_rs::request::ParamSetValue;
641 let mut batch = self.build_sort_params_batch();
642 batch.addr0.push(ParamSetValue::Int32 {
643 reason: self.plugin_params.dropped_arrays,
644 addr: 0,
645 value: self
646 .dropped_arrays
647 .load(std::sync::atomic::Ordering::Acquire),
648 });
649 batch
650 }
651
652 fn build_publish_batch(
656 &mut self,
657 output_arrays: Vec<Arc<NDArray>>,
658 param_updates: Vec<ParamUpdate>,
659 scatter_index: Option<usize>,
660 fallback_array: Option<&NDArray>,
661 elapsed_ms: f64,
662 ) -> ProcessOutput {
663 use asyn_rs::request::ParamSetValue;
664
665 let mut addr0: Vec<ParamSetValue> = Vec::new();
666 let mut extra: std::collections::HashMap<i32, Vec<ParamSetValue>> =
667 std::collections::HashMap::new();
668
669 if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
670 self.array_counter += 1;
671
672 if let Some(param) = self.std_array_data_param {
674 use crate::ndarray::NDDataBuffer;
675 use asyn_rs::param::ParamValue;
676 let value = match &report_arr.data {
677 NDDataBuffer::I8(v) => {
678 Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
679 }
680 NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
681 v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
682 ))),
683 NDDataBuffer::I16(v) => {
684 Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
685 }
686 NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
687 v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
688 ))),
689 NDDataBuffer::I32(v) => {
690 Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
691 }
692 NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
693 v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
694 ))),
695 NDDataBuffer::I64(v) => {
696 Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
697 }
698 NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
699 v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
700 ))),
701 NDDataBuffer::F32(v) => {
702 Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
703 }
704 NDDataBuffer::F64(v) => {
705 Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
706 }
707 };
708 if let Some(value) = value {
709 let ts = report_arr.timestamp.to_system_time();
710 self.port_handle
711 .interrupts()
712 .notify(asyn_rs::interrupt::InterruptValue {
713 reason: param,
714 addr: 0,
715 value,
716 timestamp: ts,
717 uint32_changed_mask: 0,
718 ..Default::default()
719 });
720 }
721 }
722
723 let info = report_arr.info();
724 let color_mode = report_arr
728 .attributes
729 .get("ColorMode")
730 .and_then(|a| a.value.as_i64())
731 .map(|v| v as i32)
732 .unwrap_or(info.color_mode as i32);
733 let bayer_pattern = report_arr
734 .attributes
735 .get("BayerPattern")
736 .and_then(|a| a.value.as_i64())
737 .map(|v| v as i32)
738 .unwrap_or(0);
739
740 let cur_dims: Vec<i32> = report_arr.dims.iter().map(|d| d.size as i32).collect();
743 if cur_dims != self.dims_prev {
744 self.dims_prev = cur_dims.clone();
745 self.port_handle
746 .interrupts()
747 .notify(asyn_rs::interrupt::InterruptValue {
748 reason: self.ndarray_params.array_dimensions,
749 addr: 0,
750 value: asyn_rs::param::ParamValue::Int32Array(std::sync::Arc::from(
751 cur_dims.as_slice(),
752 )),
753 timestamp: report_arr.timestamp.to_system_time(),
754 uint32_changed_mask: 0,
755 ..Default::default()
756 });
757 }
758
759 addr0.extend([
760 ParamSetValue::Int32 {
761 reason: self.ndarray_params.array_counter,
762 addr: 0,
763 value: self.array_counter,
764 },
765 ParamSetValue::Int32 {
766 reason: self.ndarray_params.unique_id,
767 addr: 0,
768 value: report_arr.unique_id,
769 },
770 ParamSetValue::Int32 {
771 reason: self.ndarray_params.n_dimensions,
772 addr: 0,
773 value: report_arr.dims.len() as i32,
774 },
775 ParamSetValue::Int32 {
776 reason: self.ndarray_params.array_size_x,
777 addr: 0,
778 value: info.x_size as i32,
779 },
780 ParamSetValue::Int32 {
781 reason: self.ndarray_params.array_size_y,
782 addr: 0,
783 value: info.y_size as i32,
784 },
785 ParamSetValue::Int32 {
786 reason: self.ndarray_params.array_size_z,
787 addr: 0,
788 value: info.color_size as i32,
789 },
790 ParamSetValue::Int32 {
791 reason: self.ndarray_params.array_size,
792 addr: 0,
793 value: info.total_bytes as i32,
794 },
795 ParamSetValue::Int32 {
796 reason: self.ndarray_params.data_type,
797 addr: 0,
798 value: report_arr.data.data_type() as i32,
799 },
800 ParamSetValue::Int32 {
801 reason: self.ndarray_params.color_mode,
802 addr: 0,
803 value: color_mode,
804 },
805 ParamSetValue::Int32 {
806 reason: self.ndarray_params.bayer_pattern,
807 addr: 0,
808 value: bayer_pattern,
809 },
810 ParamSetValue::Float64 {
811 reason: self.ndarray_params.timestamp_rbv,
812 addr: 0,
813 value: report_arr.timestamp.as_f64(),
814 },
815 ParamSetValue::Int32 {
816 reason: self.ndarray_params.epics_ts_sec,
817 addr: 0,
818 value: report_arr.timestamp.sec as i32,
819 },
820 ParamSetValue::Int32 {
821 reason: self.ndarray_params.epics_ts_nsec,
822 addr: 0,
823 value: report_arr.timestamp.nsec as i32,
824 },
825 ]);
826 }
827
828 addr0.push(ParamSetValue::Float64 {
829 reason: self.plugin_params.execution_time,
830 addr: 0,
831 value: elapsed_ms,
832 });
833
834 for update in ¶m_updates {
839 match update {
840 ParamUpdate::Int32 {
841 reason,
842 addr,
843 value,
844 } => {
845 let pv = ParamSetValue::Int32 {
846 reason: *reason,
847 addr: *addr,
848 value: *value,
849 };
850 if *addr == 0 {
851 addr0.push(pv);
852 } else {
853 extra.entry(*addr).or_default().push(pv);
854 }
855 }
856 ParamUpdate::Float64 {
857 reason,
858 addr,
859 value,
860 } => {
861 let pv = ParamSetValue::Float64 {
862 reason: *reason,
863 addr: *addr,
864 value: *value,
865 };
866 if *addr == 0 {
867 addr0.push(pv);
868 } else {
869 extra.entry(*addr).or_default().push(pv);
870 }
871 }
872 ParamUpdate::Octet {
873 reason,
874 addr,
875 value,
876 } => {
877 let pv = ParamSetValue::Octet {
878 reason: *reason,
879 addr: *addr,
880 value: value.clone(),
881 };
882 if *addr == 0 {
883 addr0.push(pv);
884 } else {
885 extra.entry(*addr).or_default().push(pv);
886 }
887 }
888 ParamUpdate::Float64Array {
889 reason,
890 addr,
891 value,
892 } => {
893 let pv = ParamSetValue::Float64Array {
894 reason: *reason,
895 addr: *addr,
896 value: value.clone(),
897 };
898 if *addr == 0 {
899 addr0.push(pv);
900 } else {
901 extra.entry(*addr).or_default().push(pv);
902 }
903 }
904 }
905 }
906
907 ProcessOutput {
908 arrays: output_arrays,
909 scatter_index,
910 batch: ParamBatch { addr0, extra },
911 }
912 }
913}
914
915struct ProcessOutput {
917 arrays: Vec<Arc<NDArray>>,
918 scatter_index: Option<usize>,
919 batch: ParamBatch,
920}
921
922impl ProcessOutput {
923 async fn publish_arrays(&self, senders: &[NDArraySender]) {
929 for arr in &self.arrays {
930 if let Some(idx) = self.scatter_index {
931 if let Some(sender) = senders.get(idx % senders.len().max(1)) {
932 sender.publish(arr.clone()).await;
933 }
934 } else {
935 let futs = senders.iter().map(|s| s.publish(arr.clone()));
936 futures_util::future::join_all(futs).await;
937 }
938 }
939 }
940}
941
942struct ParamBatch {
945 addr0: Vec<asyn_rs::request::ParamSetValue>,
946 extra: std::collections::HashMap<i32, Vec<asyn_rs::request::ParamSetValue>>,
947}
948
949impl ParamBatch {
950 fn empty() -> Self {
951 Self {
952 addr0: Vec::new(),
953 extra: std::collections::HashMap::new(),
954 }
955 }
956
957 fn merge(&mut self, other: ParamBatch) {
958 self.addr0.extend(other.addr0);
959 for (addr, updates) in other.extra {
960 self.extra.entry(addr).or_default().extend(updates);
961 }
962 }
963
964 async fn flush(self, port: &asyn_rs::port_handle::PortHandle) {
966 if !self.addr0.is_empty() {
967 if let Err(e) = port.set_params_and_notify(0, self.addr0).await {
968 eprintln!("plugin param flush error (addr 0): {e}");
969 }
970 }
971 for (addr, updates) in self.extra {
972 if let Err(e) = port.set_params_and_notify(addr, updates).await {
973 eprintln!("plugin param flush error (addr {addr}): {e}");
974 }
975 }
976 }
977}
978
979#[allow(dead_code)]
981pub struct PluginPortDriver {
982 base: PortDriverBase,
983 ndarray_params: NDArrayDriverParams,
984 plugin_params: PluginBaseParams,
985 param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
986 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
988 std_array_data_param: Option<usize>,
990}
991
992impl PluginPortDriver {
993 fn new<P: NDPluginProcess>(
994 port_name: &str,
995 plugin_type_name: &str,
996 queue_size: usize,
997 ndarray_port: &str,
998 max_addr: usize,
999 param_change_tx: tokio::sync::mpsc::UnboundedSender<(usize, i32, ParamChangeValue)>,
1000 processor: &mut P,
1001 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
1002 ) -> AsynResult<Self> {
1003 let mut base = PortDriverBase::new(
1004 port_name,
1005 max_addr,
1006 PortFlags {
1007 can_block: true,
1008 ..Default::default()
1009 },
1010 );
1011
1012 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
1013 let plugin_params = PluginBaseParams::create(&mut base)?;
1014
1015 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
1017 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
1018 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
1019 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
1020 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
1021 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
1022 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
1023 base.set_int32_param(ndarray_params.write_file, 0, 0)?;
1024 base.set_int32_param(ndarray_params.read_file, 0, 0)?;
1025 base.set_int32_param(ndarray_params.capture, 0, 0)?;
1026 base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
1027 base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
1028 base.set_string_param(ndarray_params.file_path, 0, "".into())?;
1029 base.set_string_param(ndarray_params.file_name, 0, "".into())?;
1030 base.set_int32_param(ndarray_params.file_number, 0, 0)?;
1031 base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
1032 base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
1033 base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
1034 base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
1035 base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
1036
1037 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
1039 base.set_string_param(
1040 ndarray_params.ad_core_version,
1041 0,
1042 env!("CARGO_PKG_VERSION").into(),
1043 )?;
1044 base.set_string_param(
1045 ndarray_params.driver_version,
1046 0,
1047 env!("CARGO_PKG_VERSION").into(),
1048 )?;
1049 if !ndarray_port.is_empty() {
1050 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
1051 }
1052
1053 let std_array_data_param = if array_data.is_some() {
1055 Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
1056 } else {
1057 None
1058 };
1059
1060 processor.register_params(&mut base)?;
1062
1063 Ok(Self {
1064 base,
1065 ndarray_params,
1066 plugin_params,
1067 param_change_tx,
1068 array_data,
1069 std_array_data_param,
1070 })
1071 }
1072}
1073
1074fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
1076 let n = src.len().min(dst.len());
1077 dst[..n].copy_from_slice(&src[..n]);
1078 n
1079}
1080
1081fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
1083where
1084 S: CastToF64 + Copy,
1085 D: CastFromF64 + Copy,
1086{
1087 let n = src.len().min(dst.len());
1088 for i in 0..n {
1089 dst[i] = D::cast_from_f64(src[i].cast_to_f64());
1090 }
1091 n
1092}
1093
1094trait CCastTo<D> {
1110 fn ccast(self) -> D;
1111}
1112macro_rules! impl_ccast {
1113 ( $src:ty => $( $dst:ty ),+ ) => {
1114 $(
1115 impl CCastTo<$dst> for $src {
1116 #[inline]
1117 fn ccast(self) -> $dst {
1118 self as $dst
1119 }
1120 }
1121 )+
1122 };
1123}
1124impl_ccast!(i8 => i16, i32, i64);
1125impl_ccast!(u8 => i8, i16, i32, i64);
1126impl_ccast!(i16 => i8, i32, i64);
1127impl_ccast!(u16 => i8, i16, i32, i64);
1128impl_ccast!(i32 => i8, i16, i64);
1129impl_ccast!(u32 => i8, i16, i32, i64);
1130impl_ccast!(i64 => i8, i16, i32);
1131impl_ccast!(u64 => i8, i16, i32, i64);
1132
1133fn copy_ccast<S, D>(src: &[S], dst: &mut [D]) -> usize
1137where
1138 S: CCastTo<D> + Copy,
1139 D: Copy,
1140{
1141 let n = src.len().min(dst.len());
1142 for i in 0..n {
1143 dst[i] = src[i].ccast();
1144 }
1145 n
1146}
1147
1148trait CastToF64 {
1150 fn cast_to_f64(self) -> f64;
1151}
1152
1153impl CastToF64 for i8 {
1154 fn cast_to_f64(self) -> f64 {
1155 self as f64
1156 }
1157}
1158impl CastToF64 for u8 {
1159 fn cast_to_f64(self) -> f64 {
1160 self as f64
1161 }
1162}
1163impl CastToF64 for i16 {
1164 fn cast_to_f64(self) -> f64 {
1165 self as f64
1166 }
1167}
1168impl CastToF64 for u16 {
1169 fn cast_to_f64(self) -> f64 {
1170 self as f64
1171 }
1172}
1173impl CastToF64 for i32 {
1174 fn cast_to_f64(self) -> f64 {
1175 self as f64
1176 }
1177}
1178impl CastToF64 for u32 {
1179 fn cast_to_f64(self) -> f64 {
1180 self as f64
1181 }
1182}
1183impl CastToF64 for i64 {
1184 fn cast_to_f64(self) -> f64 {
1185 self as f64
1186 }
1187}
1188impl CastToF64 for u64 {
1189 fn cast_to_f64(self) -> f64 {
1190 self as f64
1191 }
1192}
1193impl CastToF64 for f32 {
1194 fn cast_to_f64(self) -> f64 {
1195 self as f64
1196 }
1197}
1198impl CastToF64 for f64 {
1199 fn cast_to_f64(self) -> f64 {
1200 self
1201 }
1202}
1203
1204trait CastFromF64 {
1206 fn cast_from_f64(v: f64) -> Self;
1207}
1208
1209impl CastFromF64 for i8 {
1210 fn cast_from_f64(v: f64) -> Self {
1211 v as i8
1212 }
1213}
1214impl CastFromF64 for i16 {
1215 fn cast_from_f64(v: f64) -> Self {
1216 v as i16
1217 }
1218}
1219impl CastFromF64 for i32 {
1220 fn cast_from_f64(v: f64) -> Self {
1221 v as i32
1222 }
1223}
1224impl CastFromF64 for i64 {
1225 fn cast_from_f64(v: f64) -> Self {
1226 v as i64
1227 }
1228}
1229impl CastFromF64 for f32 {
1230 fn cast_from_f64(v: f64) -> Self {
1231 v as f32
1232 }
1233}
1234impl CastFromF64 for f64 {
1235 fn cast_from_f64(v: f64) -> Self {
1236 v
1237 }
1238}
1239
1240macro_rules! impl_read_array {
1243 (
1244 $self:expr, $buf:expr, $direct_variant:ident,
1245 ccast: [ $( $ccast_variant:ident ),* ],
1246 convert: [ $( $variant:ident ),* ]
1247 ) => {{
1248 use crate::ndarray::NDDataBuffer;
1249 let handle = match &$self.array_data {
1250 Some(h) => h,
1251 None => return Ok(0),
1252 };
1253 let guard = handle.lock();
1254 let array = match &*guard {
1255 Some(a) => a,
1256 None => return Ok(0),
1257 };
1258 let n = match &array.data {
1259 NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
1260 $( NDDataBuffer::$ccast_variant(v) => copy_ccast(v, $buf), )*
1261 $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
1262 };
1263 Ok(n)
1264 }};
1265}
1266
1267impl PortDriver for PluginPortDriver {
1268 fn base(&self) -> &PortDriverBase {
1269 &self.base
1270 }
1271
1272 fn base_mut(&mut self) -> &mut PortDriverBase {
1273 &mut self.base
1274 }
1275
1276 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
1277 let reason = user.reason;
1278 let addr = user.addr;
1279 self.base.set_int32_param(reason, addr, value)?;
1280 self.base.call_param_callbacks(addr)?;
1281 let _ = self
1283 .param_change_tx
1284 .send((reason, addr, ParamChangeValue::Int32(value)));
1285 Ok(())
1286 }
1287
1288 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
1289 let reason = user.reason;
1290 let addr = user.addr;
1291 self.base.set_float64_param(reason, addr, value)?;
1292 self.base.call_param_callbacks(addr)?;
1293 let _ = self
1294 .param_change_tx
1295 .send((reason, addr, ParamChangeValue::Float64(value)));
1296 Ok(())
1297 }
1298
1299 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
1300 let reason = user.reason;
1301 let addr = user.addr;
1302 let s = String::from_utf8_lossy(data).into_owned();
1303 self.base.set_string_param(reason, addr, s.clone())?;
1304 self.base.call_param_callbacks(addr)?;
1305 let _ = self
1306 .param_change_tx
1307 .send((reason, addr, ParamChangeValue::Octet(s)));
1308 Ok(())
1309 }
1310
1311 fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
1312 impl_read_array!(
1315 self, buf, I8,
1316 ccast: [U8, I16, U16, I32, U32, I64, U64],
1317 convert: [F32, F64]
1318 )
1319 }
1320
1321 fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
1322 impl_read_array!(
1323 self, buf, I16,
1324 ccast: [I8, U8, U16, I32, U32, I64, U64],
1325 convert: [F32, F64]
1326 )
1327 }
1328
1329 fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
1330 impl_read_array!(
1331 self, buf, I32,
1332 ccast: [I8, U8, I16, U16, U32, I64, U64],
1333 convert: [F32, F64]
1334 )
1335 }
1336
1337 fn read_int64_array(&mut self, _user: &AsynUser, buf: &mut [i64]) -> AsynResult<usize> {
1338 impl_read_array!(
1339 self, buf, I64,
1340 ccast: [I8, U8, I16, U16, I32, U32, U64],
1341 convert: [F32, F64]
1342 )
1343 }
1344
1345 fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
1346 impl_read_array!(
1347 self, buf, F32,
1348 ccast: [],
1349 convert: [I8, U8, I16, U16, I32, U32, I64, U64, F64]
1350 )
1351 }
1352
1353 fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
1354 impl_read_array!(
1355 self, buf, F64,
1356 ccast: [],
1357 convert: [I8, U8, I16, U16, I32, U32, I64, U64, F32]
1358 )
1359 }
1360}
1361
1362#[derive(Clone)]
1364pub struct PluginRuntimeHandle {
1365 port_runtime: PortRuntimeHandle,
1366 array_sender: NDArraySender,
1367 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
1368 port_name: String,
1369 pub ndarray_params: NDArrayDriverParams,
1370 pub plugin_params: PluginBaseParams,
1371}
1372
1373impl PluginRuntimeHandle {
1374 pub fn port_runtime(&self) -> &PortRuntimeHandle {
1375 &self.port_runtime
1376 }
1377
1378 pub fn array_sender(&self) -> &NDArraySender {
1379 &self.array_sender
1380 }
1381
1382 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
1383 &self.array_output
1384 }
1385
1386 pub fn port_name(&self) -> &str {
1387 &self.port_name
1388 }
1389}
1390
1391pub fn create_plugin_runtime<P: NDPluginProcess>(
1398 port_name: &str,
1399 processor: P,
1400 pool: Arc<NDArrayPool>,
1401 queue_size: usize,
1402 ndarray_port: &str,
1403 wiring: Arc<WiringRegistry>,
1404) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1405 create_plugin_runtime_multi_addr(
1406 port_name,
1407 processor,
1408 pool,
1409 queue_size,
1410 ndarray_port,
1411 wiring,
1412 1,
1413 )
1414}
1415
1416pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
1420 port_name: &str,
1421 mut processor: P,
1422 pool: Arc<NDArrayPool>,
1423 queue_size: usize,
1424 ndarray_port: &str,
1425 wiring: Arc<WiringRegistry>,
1426 max_addr: usize,
1427) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1428 let (param_tx, param_rx) =
1433 tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
1434
1435 let plugin_type_name = processor.plugin_type().to_string();
1437 let compression_aware = processor.compression_aware();
1438 let array_data = processor.array_data_handle();
1439
1440 let driver = PluginPortDriver::new(
1442 port_name,
1443 &plugin_type_name,
1444 queue_size,
1445 ndarray_port,
1446 max_addr,
1447 param_tx,
1448 &mut processor,
1449 array_data,
1450 )
1451 .expect("failed to create plugin port driver");
1452
1453 let ndarray_params = driver.ndarray_params;
1454 let plugin_params = driver.plugin_params;
1455 let std_array_data_param = driver.std_array_data_param;
1456
1457 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1459
1460 let port_handle = port_runtime.port_handle().clone();
1462
1463 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1465
1466 let enabled = Arc::new(AtomicBool::new(false));
1468 let blocking_mode = Arc::new(AtomicBool::new(false));
1469
1470 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
1472 let array_output_for_handle = array_output.clone();
1473 wiring.register_output_addrs(port_name, max_addr, array_output.clone());
1479 let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
1482 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1483 processor,
1484 output: array_output,
1485 pool,
1486 ndarray_params,
1487 plugin_params,
1488 port_handle,
1489 array_counter: 0,
1490 std_array_data_param,
1491 min_callback_time: 0.0,
1492 last_process_time: None,
1493 sort_mode: 0,
1494 sort_time: 0.0,
1495 sort_size: 10,
1496 sort_buffer: SortBuffer::new(),
1497 dropped_arrays: dropped_arrays_counter,
1498 compression_aware,
1499 max_byte_rate: 0.0,
1500 throttler: super::throttler::Throttler::new(0.0),
1501 prev_input_array: None,
1502 dims_prev: Vec::new(),
1503 nd_array_addr: 0,
1504 max_threads: 1,
1505 num_threads: 1,
1506 }));
1507
1508 let data_enabled = enabled.clone();
1509 let data_blocking = blocking_mode.clone();
1510
1511 let mut array_sender = array_sender;
1512 array_sender.set_mode_flags(enabled, blocking_mode);
1513
1514 let sender_port_name = port_name.to_string();
1516 let initial_upstream = ndarray_port.to_string();
1517
1518 let data_jh = thread::Builder::new()
1520 .name(format!("plugin-data-{port_name}"))
1521 .spawn(move || {
1522 plugin_data_loop(
1523 shared,
1524 array_rx,
1525 param_rx,
1526 plugin_params,
1527 ndarray_params.array_counter,
1528 data_enabled,
1529 data_blocking,
1530 sender_port_name,
1531 initial_upstream,
1532 wiring,
1533 );
1534 })
1535 .expect("failed to spawn plugin data thread");
1536
1537 let handle = PluginRuntimeHandle {
1538 port_runtime,
1539 array_sender,
1540 array_output: array_output_for_handle,
1541 port_name: port_name.to_string(),
1542 ndarray_params,
1543 plugin_params,
1544 };
1545
1546 (handle, data_jh)
1547}
1548
1549fn queue_status_batch(
1555 plugin_params: &PluginBaseParams,
1556 max_capacity: usize,
1557 free: i32,
1558) -> ParamBatch {
1559 use asyn_rs::request::ParamSetValue;
1560 ParamBatch {
1561 addr0: vec![
1562 ParamSetValue::Int32 {
1563 reason: plugin_params.queue_size,
1564 addr: 0,
1565 value: max_capacity as i32,
1566 },
1567 ParamSetValue::Int32 {
1568 reason: plugin_params.queue_use,
1569 addr: 0,
1570 value: free,
1571 },
1572 ],
1573 extra: std::collections::HashMap::new(),
1574 }
1575}
1576
1577async fn clamp_writeback(port: &PortHandle, reason: usize, value: i32) {
1580 use asyn_rs::request::ParamSetValue;
1581 let _ = port
1582 .set_params_and_notify(
1583 0,
1584 vec![ParamSetValue::Int32 {
1585 reason,
1586 addr: 0,
1587 value,
1588 }],
1589 )
1590 .await;
1591}
1592
1593fn plugin_data_loop<P: NDPluginProcess>(
1594 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
1595 mut array_rx: NDArrayReceiver,
1596 mut param_rx: tokio::sync::mpsc::UnboundedReceiver<(usize, i32, ParamChangeValue)>,
1597 plugin_params: PluginBaseParams,
1598 array_counter_reason: usize,
1599 enabled: Arc<AtomicBool>,
1600 blocking_mode: Arc<AtomicBool>,
1601 sender_port_name: String,
1602 initial_upstream: String,
1603 wiring: Arc<WiringRegistry>,
1604) {
1605 let enable_callbacks_reason = plugin_params.enable_callbacks;
1606 let blocking_callbacks_reason = plugin_params.blocking_callbacks;
1607 let min_callback_time_reason = plugin_params.min_callback_time;
1608 let sort_mode_reason = plugin_params.sort_mode;
1609 let sort_time_reason = plugin_params.sort_time;
1610 let sort_size_reason = plugin_params.sort_size;
1611 let nd_array_port_reason = plugin_params.nd_array_port;
1612 let nd_array_addr_reason = plugin_params.nd_array_addr;
1613 let process_plugin_reason = plugin_params.process_plugin;
1614 let max_byte_rate_reason = plugin_params.max_byte_rate;
1615 let num_threads_reason = plugin_params.num_threads;
1616 let max_threads_reason = plugin_params.max_threads;
1617 let mut current_upstream = initial_upstream;
1621 let mut current_addr: i32 = 0;
1622 let rt = tokio::runtime::Builder::new_current_thread()
1623 .enable_all()
1624 .build()
1625 .unwrap();
1626 rt.block_on(async {
1627 let mut sort_flush_interval = tokio::time::interval(std::time::Duration::from_secs(3600));
1630 let mut sort_flush_active = false;
1631 let mut last_queue_free: Option<i32> = None;
1634
1635 loop {
1636 tokio::select! {
1637 msg = array_rx.recv_msg() => {
1638 match msg {
1639 Some(msg) => {
1640 if !enabled.load(Ordering::Acquire) {
1645 continue;
1646 }
1647 let (process_output, senders, port) = {
1649 let mut guard = shared.lock();
1650 let compressed = msg.array.codec.is_some();
1654 let output = if compressed && !guard.compression_aware {
1655 guard
1656 .dropped_arrays
1657 .fetch_add(1, Ordering::AcqRel);
1658 Some(guard.dropped_arrays_only_batch())
1659 } else {
1660 guard.process_and_publish(&msg.array)
1665 };
1666 let senders = guard.output.lock().senders_clone();
1667 let port = guard.port_handle.clone();
1668 (output, senders, port)
1669 };
1670 let max_cap = array_rx.max_capacity();
1675 let free = max_cap.saturating_sub(array_rx.pending()) as i32;
1676 let queue_batch = if last_queue_free != Some(free) {
1677 last_queue_free = Some(free);
1678 Some(queue_status_batch(&plugin_params, max_cap, free))
1679 } else {
1680 None
1681 };
1682 if let Some(po) = process_output {
1685 po.publish_arrays(&senders).await;
1686 po.batch.flush(&port).await;
1687 }
1688 if let Some(qb) = queue_batch {
1689 qb.flush(&port).await;
1690 }
1691 }
1692 None => break,
1693 }
1694 }
1695 param = param_rx.recv() => {
1696 match param {
1697 Some((reason, addr, value)) => {
1698 if reason == enable_callbacks_reason {
1699 let on = value.as_i32() != 0;
1700 enabled.store(on, Ordering::Release);
1701 if !on {
1704 shared.lock().prev_input_array = None;
1705 }
1706 }
1707 if reason == blocking_callbacks_reason {
1708 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
1709 }
1710 if reason == min_callback_time_reason {
1712 shared.lock().min_callback_time = value.as_f64();
1713 }
1714 if reason == max_byte_rate_reason {
1717 let rate = value.as_f64();
1718 let mut guard = shared.lock();
1719 guard.max_byte_rate = rate;
1720 guard.throttler.reset(rate);
1721 }
1722 if reason == max_threads_reason {
1729 let (port, clamped, mt) = {
1731 let mut guard = shared.lock();
1732 guard.max_threads = value.as_i32().max(1);
1733 let clamped =
1734 guard.num_threads.clamp(1, guard.max_threads);
1735 guard.num_threads = clamped;
1736 (guard.port_handle.clone(), clamped, guard.max_threads)
1737 };
1738 clamp_writeback(&port, num_threads_reason, clamped).await;
1739 clamp_writeback(&port, max_threads_reason, mt).await;
1740 }
1741 if reason == num_threads_reason {
1742 let (port, clamped) = {
1743 let mut guard = shared.lock();
1744 let clamped =
1745 value.as_i32().clamp(1, guard.max_threads.max(1));
1746 guard.num_threads = clamped;
1747 (guard.port_handle.clone(), clamped)
1748 };
1749 clamp_writeback(&port, num_threads_reason, clamped).await;
1750 }
1751 if reason == nd_array_addr_reason {
1755 let new_addr = value.as_i32();
1756 if new_addr != current_addr {
1757 let old_key = upstream_key(¤t_upstream, current_addr);
1758 let new_key = upstream_key(¤t_upstream, new_addr);
1759 shared.lock().nd_array_addr = new_addr;
1760 match wiring.rewire_by_name(
1761 &sender_port_name,
1762 &old_key,
1763 &new_key,
1764 ) {
1765 Ok(()) => current_addr = new_addr,
1766 Err(e) => {
1767 eprintln!("NDArrayAddr reconnect failed: {e}");
1768 shared.lock().nd_array_addr = current_addr;
1769 }
1770 }
1771 }
1772 }
1773 if reason == process_plugin_reason && value.as_i32() != 0 {
1776 let (process_output, senders, port) = {
1777 let mut guard = shared.lock();
1778 let output = guard.process_plugin();
1779 let senders = guard.output.lock().senders_clone();
1780 let port = guard.port_handle.clone();
1781 (output, senders, port)
1782 };
1783 if let Some(po) = process_output {
1784 po.publish_arrays(&senders).await;
1785 po.batch.flush(&port).await;
1786 } else {
1787 eprintln!(
1788 "plugin {sender_port_name}: ProcessPlugin \
1789 requested but no input array cached"
1790 );
1791 }
1792 }
1793 if reason == array_counter_reason {
1797 shared.lock().array_counter = value.as_i32();
1798 }
1799 if reason == sort_mode_reason {
1801 let mode = value.as_i32();
1802 let flush_work = {
1805 let mut guard = shared.lock();
1806 guard.sort_mode = mode;
1807 if mode == 0 {
1808 let output = guard.flush_sort_buffer();
1809 let senders = guard.output.lock().senders_clone();
1810 let port = guard.port_handle.clone();
1811 sort_flush_active = false;
1812 Some((output, senders, port))
1813 } else {
1814 sort_flush_active = guard.sort_time > 0.0;
1815 if sort_flush_active {
1816 let dur = std::time::Duration::from_secs_f64(guard.sort_time);
1817 sort_flush_interval = tokio::time::interval(dur);
1818 }
1819 None
1820 }
1821 };
1822 if let Some((output, senders, port)) = flush_work {
1823 output.publish_arrays(&senders).await;
1824 output.batch.flush(&port).await;
1825 }
1826 }
1827 if reason == sort_time_reason {
1828 let t = value.as_f64();
1829 let mut guard = shared.lock();
1830 guard.sort_time = t;
1831 if guard.sort_mode != 0 && t > 0.0 {
1832 sort_flush_active = true;
1833 let dur = std::time::Duration::from_secs_f64(t);
1834 sort_flush_interval = tokio::time::interval(dur);
1835 } else {
1836 sort_flush_active = false;
1837 }
1838 drop(guard);
1839 }
1840 if reason == sort_size_reason {
1841 shared.lock().sort_size = value.as_i32();
1842 }
1843 if reason == nd_array_port_reason {
1845 if let Some(new_port) = value.as_string() {
1846 if new_port != current_upstream {
1847 let old_key =
1848 upstream_key(¤t_upstream, current_addr);
1849 let new_key = upstream_key(new_port, current_addr);
1850 match wiring.rewire_by_name(
1851 &sender_port_name,
1852 &old_key,
1853 &new_key,
1854 ) {
1855 Ok(()) => current_upstream = new_port.to_string(),
1856 Err(e) => {
1857 eprintln!("NDArrayPort rewire failed: {e}")
1858 }
1859 }
1860 }
1861 }
1862 }
1863 let snapshot = PluginParamSnapshot {
1864 enable_callbacks: enabled.load(Ordering::Acquire),
1865 reason,
1866 addr,
1867 value,
1868 };
1869 let (process_output, senders, port) = {
1870 let mut guard = shared.lock();
1871 let t0 = std::time::Instant::now();
1872 let result = guard.processor.on_param_change(reason, &snapshot);
1873 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
1874 let output = if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
1875 Some(guard.build_publish_batch(result.output_arrays, result.param_updates, None, None, elapsed_ms))
1876 } else {
1877 None
1878 };
1879 let senders = guard.output.lock().senders_clone();
1880 (output, senders, guard.port_handle.clone())
1881 };
1882 if let Some(po) = process_output {
1883 po.publish_arrays(&senders).await;
1884 po.batch.flush(&port).await;
1885 }
1886 }
1887 None => break,
1888 }
1889 }
1890 _ = sort_flush_interval.tick(), if sort_flush_active => {
1891 let (output, senders, port) = {
1894 let mut guard = shared.lock();
1895 let output = guard.tick_sort_buffer();
1896 let senders = guard.output.lock().senders_clone();
1897 let port = guard.port_handle.clone();
1898 (output, senders, port)
1899 };
1900 output.publish_arrays(&senders).await;
1901 output.batch.flush(&port).await;
1902 }
1903 }
1904 }
1905 });
1906}
1907
1908pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1915 upstream.array_output().lock().add(downstream_sender);
1916}
1917
1918pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1920 port_name: &str,
1921 mut processor: P,
1922 pool: Arc<NDArrayPool>,
1923 queue_size: usize,
1924 output: NDArrayOutput,
1925 ndarray_port: &str,
1926 wiring: Arc<WiringRegistry>,
1927) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1928 let (param_tx, param_rx) =
1932 tokio::sync::mpsc::unbounded_channel::<(usize, i32, ParamChangeValue)>();
1933
1934 let plugin_type_name = processor.plugin_type().to_string();
1935 let compression_aware = processor.compression_aware();
1936 let array_data = processor.array_data_handle();
1937 let driver = PluginPortDriver::new(
1938 port_name,
1939 &plugin_type_name,
1940 queue_size,
1941 ndarray_port,
1942 1,
1943 param_tx,
1944 &mut processor,
1945 array_data,
1946 )
1947 .expect("failed to create plugin port driver");
1948
1949 let ndarray_params = driver.ndarray_params;
1950 let plugin_params = driver.plugin_params;
1951 let std_array_data_param = driver.std_array_data_param;
1952
1953 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1954
1955 let port_handle = port_runtime.port_handle().clone();
1956
1957 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1958
1959 let enabled = Arc::new(AtomicBool::new(false));
1960 let blocking_mode = Arc::new(AtomicBool::new(false));
1961
1962 let array_output = Arc::new(parking_lot::Mutex::new(output));
1963 let array_output_for_handle = array_output.clone();
1964 wiring.register_output(port_name, array_output.clone());
1968 let dropped_arrays_counter = array_sender.dropped_arrays_counter().clone();
1970 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1971 processor,
1972 output: array_output,
1973 pool,
1974 ndarray_params,
1975 plugin_params,
1976 port_handle,
1977 array_counter: 0,
1978 std_array_data_param,
1979 min_callback_time: 0.0,
1980 last_process_time: None,
1981 sort_mode: 0,
1982 sort_time: 0.0,
1983 sort_size: 10,
1984 sort_buffer: SortBuffer::new(),
1985 dropped_arrays: dropped_arrays_counter,
1986 compression_aware,
1987 max_byte_rate: 0.0,
1988 throttler: super::throttler::Throttler::new(0.0),
1989 prev_input_array: None,
1990 dims_prev: Vec::new(),
1991 nd_array_addr: 0,
1992 max_threads: 1,
1993 num_threads: 1,
1994 }));
1995
1996 let data_enabled = enabled.clone();
1997 let data_blocking = blocking_mode.clone();
1998
1999 let mut array_sender = array_sender;
2000 array_sender.set_mode_flags(enabled, blocking_mode);
2001
2002 let sender_port_name = port_name.to_string();
2004 let initial_upstream = ndarray_port.to_string();
2005
2006 let data_jh = thread::Builder::new()
2007 .name(format!("plugin-data-{port_name}"))
2008 .spawn(move || {
2009 plugin_data_loop(
2010 shared,
2011 array_rx,
2012 param_rx,
2013 plugin_params,
2014 ndarray_params.array_counter,
2015 data_enabled,
2016 data_blocking,
2017 sender_port_name,
2018 initial_upstream,
2019 wiring,
2020 );
2021 })
2022 .expect("failed to spawn plugin data thread");
2023
2024 let handle = PluginRuntimeHandle {
2025 port_runtime,
2026 array_sender,
2027 array_output: array_output_for_handle,
2028 port_name: port_name.to_string(),
2029 ndarray_params,
2030 plugin_params,
2031 };
2032
2033 (handle, data_jh)
2034}
2035
2036#[cfg(test)]
2037mod tests {
2038 use super::*;
2039 use crate::ndarray::{NDDataType, NDDimension};
2040 use crate::plugin::channel::ndarray_channel;
2041
2042 struct PassthroughProcessor;
2044
2045 impl NDPluginProcess for PassthroughProcessor {
2046 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2047 ProcessResult::arrays(vec![Arc::new(array.clone())])
2048 }
2049 fn plugin_type(&self) -> &str {
2050 "Passthrough"
2051 }
2052 }
2053
2054 struct SinkProcessor {
2056 count: usize,
2057 }
2058
2059 impl NDPluginProcess for SinkProcessor {
2060 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2061 self.count += 1;
2062 ProcessResult::empty()
2063 }
2064 fn plugin_type(&self) -> &str {
2065 "Sink"
2066 }
2067 }
2068
2069 fn make_test_array(id: i32) -> Arc<NDArray> {
2070 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
2071 arr.unique_id = id;
2072 Arc::new(arr)
2073 }
2074
2075 fn test_wiring() -> Arc<WiringRegistry> {
2076 Arc::new(WiringRegistry::new())
2077 }
2078
2079 fn enable_callbacks(handle: &PluginRuntimeHandle) {
2081 handle
2082 .port_runtime()
2083 .port_handle()
2084 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
2085 .unwrap();
2086 std::thread::sleep(std::time::Duration::from_millis(10));
2087 }
2088
2089 fn send_array(sender: &NDArraySender, array: Arc<NDArray>) {
2093 let sender = sender.clone();
2094 let jh = std::thread::spawn(move || {
2095 let rt = tokio::runtime::Builder::new_current_thread()
2096 .enable_all()
2097 .build()
2098 .unwrap();
2099 rt.block_on(sender.publish(array));
2100 });
2101 jh.join().unwrap();
2102 }
2103
2104 #[test]
2105 fn test_passthrough_runtime() {
2106 let pool = Arc::new(NDArrayPool::new(1_000_000));
2107
2108 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2110 let mut output = NDArrayOutput::new();
2111 output.add(downstream_sender);
2112
2113 let (handle, _data_jh) = create_plugin_runtime_with_output(
2114 "PASS1",
2115 PassthroughProcessor,
2116 pool,
2117 10,
2118 output,
2119 "",
2120 test_wiring(),
2121 );
2122 enable_callbacks(&handle);
2123
2124 send_array(handle.array_sender(), make_test_array(42));
2126
2127 let received = downstream_rx.blocking_recv().unwrap();
2129 assert_eq!(received.unique_id, 42);
2130 }
2131
2132 #[test]
2133 fn test_sink_runtime() {
2134 let pool = Arc::new(NDArrayPool::new(1_000_000));
2135
2136 let (handle, _data_jh) = create_plugin_runtime(
2137 "SINK1",
2138 SinkProcessor { count: 0 },
2139 pool,
2140 10,
2141 "",
2142 test_wiring(),
2143 );
2144 enable_callbacks(&handle);
2145
2146 send_array(handle.array_sender(), make_test_array(1));
2148 send_array(handle.array_sender(), make_test_array(2));
2149
2150 std::thread::sleep(std::time::Duration::from_millis(50));
2152
2153 assert_eq!(handle.port_name(), "SINK1");
2155 }
2156
2157 #[test]
2158 fn test_plugin_type_param() {
2159 let pool = Arc::new(NDArrayPool::new(1_000_000));
2160
2161 let (handle, _data_jh) = create_plugin_runtime(
2162 "TYPE_TEST",
2163 PassthroughProcessor,
2164 pool,
2165 10,
2166 "",
2167 test_wiring(),
2168 );
2169
2170 assert_eq!(handle.port_name(), "TYPE_TEST");
2172 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
2173 }
2174
2175 #[test]
2176 fn test_shutdown_on_handle_drop() {
2177 let pool = Arc::new(NDArrayPool::new(1_000_000));
2178
2179 let (handle, data_jh) = create_plugin_runtime(
2180 "SHUTDOWN_TEST",
2181 PassthroughProcessor,
2182 pool,
2183 10,
2184 "",
2185 test_wiring(),
2186 );
2187
2188 let sender = handle.array_sender().clone();
2190 drop(handle);
2191 drop(sender);
2192
2193 let result = data_jh.join();
2195 assert!(result.is_ok());
2196 }
2197
2198 #[test]
2199 fn test_wire_to_nonzero_ndarray_addr() {
2200 use crate::plugin::wiring::upstream_key;
2206 let pool = Arc::new(NDArrayPool::new(1_000_000));
2207 let wiring = test_wiring();
2208
2209 let (up_handle, _up_jh) = create_plugin_runtime_multi_addr(
2211 "UP_MULTI",
2212 PassthroughProcessor,
2213 pool,
2214 10,
2215 "",
2216 wiring.clone(),
2217 2,
2218 );
2219 enable_callbacks(&up_handle);
2220
2221 let addr0 = wiring.lookup_output("UP_MULTI");
2223 let addr1 = wiring.lookup_output(&upstream_key("UP_MULTI", 1));
2224 assert!(addr0.is_some(), "addr 0 output must be registered");
2225 assert!(
2226 addr1.is_some(),
2227 "addr 1 output must be registered for a max_addr=2 port"
2228 );
2229
2230 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWN_ADDR1", 10);
2232 wiring
2233 .rewire(&downstream_sender, "", &upstream_key("UP_MULTI", 1))
2234 .expect("wiring a consumer to NDArrayAddr=1 must succeed");
2235
2236 send_array(up_handle.array_sender(), make_test_array(99));
2238 let received = downstream_rx.blocking_recv().unwrap();
2239 assert_eq!(
2240 received.unique_id, 99,
2241 "consumer wired to NDArrayAddr=1 must receive upstream arrays"
2242 );
2243 }
2244
2245 #[test]
2246 fn test_nonblocking_passthrough() {
2247 let pool = Arc::new(NDArrayPool::new(1_000_000));
2248 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2249 let mut output = NDArrayOutput::new();
2250 output.add(downstream_sender);
2251
2252 let (handle, _data_jh) = create_plugin_runtime_with_output(
2253 "NB_TEST",
2254 PassthroughProcessor,
2255 pool,
2256 10,
2257 output,
2258 "",
2259 test_wiring(),
2260 );
2261 enable_callbacks(&handle);
2262
2263 send_array(handle.array_sender(), make_test_array(42));
2264
2265 let received = downstream_rx.blocking_recv().unwrap();
2266 assert_eq!(received.unique_id, 42);
2267 }
2268
2269 #[test]
2270 fn test_blocking_to_nonblocking_switch() {
2271 let pool = Arc::new(NDArrayPool::new(1_000_000));
2272 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2273 let mut output = NDArrayOutput::new();
2274 output.add(downstream_sender);
2275
2276 let (handle, _data_jh) = create_plugin_runtime_with_output(
2277 "SWITCH_TEST",
2278 PassthroughProcessor,
2279 pool,
2280 10,
2281 output,
2282 "",
2283 test_wiring(),
2284 );
2285 enable_callbacks(&handle);
2286
2287 handle
2289 .port_runtime()
2290 .port_handle()
2291 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
2292 .unwrap();
2293 std::thread::sleep(std::time::Duration::from_millis(50));
2294
2295 send_array(handle.array_sender(), make_test_array(1));
2296 let received = downstream_rx.blocking_recv().unwrap();
2297 assert_eq!(received.unique_id, 1);
2298
2299 handle
2301 .port_runtime()
2302 .port_handle()
2303 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
2304 .unwrap();
2305 std::thread::sleep(std::time::Duration::from_millis(50));
2306
2307 send_array(handle.array_sender(), make_test_array(2));
2309 let received = downstream_rx.blocking_recv().unwrap();
2310 assert_eq!(received.unique_id, 2);
2311 }
2312
2313 #[test]
2314 fn test_enable_callbacks_disables_processing() {
2315 let pool = Arc::new(NDArrayPool::new(1_000_000));
2316 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2317 let mut output = NDArrayOutput::new();
2318 output.add(downstream_sender);
2319
2320 let (handle, _data_jh) = create_plugin_runtime_with_output(
2321 "ENABLE_TEST",
2322 PassthroughProcessor,
2323 pool,
2324 10,
2325 output,
2326 "",
2327 test_wiring(),
2328 );
2329
2330 handle
2332 .port_runtime()
2333 .port_handle()
2334 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
2335 .unwrap();
2336 std::thread::sleep(std::time::Duration::from_millis(50));
2337
2338 send_array(handle.array_sender(), make_test_array(99));
2340
2341 let rt = tokio::runtime::Builder::new_current_thread()
2343 .enable_all()
2344 .build()
2345 .unwrap();
2346 let result = rt.block_on(async {
2347 tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
2348 });
2349 assert!(
2350 result.is_err(),
2351 "should not receive array when callbacks disabled"
2352 );
2353 }
2354
2355 #[test]
2356 fn test_downstream_receives_multiple() {
2357 let pool = Arc::new(NDArrayPool::new(1_000_000));
2358
2359 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
2360 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
2361 let mut output = NDArrayOutput::new();
2362 output.add(ds1);
2363 output.add(ds2);
2364
2365 let (handle, _data_jh) = create_plugin_runtime_with_output(
2366 "DS_TEST",
2367 PassthroughProcessor,
2368 pool,
2369 10,
2370 output,
2371 "",
2372 test_wiring(),
2373 );
2374 enable_callbacks(&handle);
2375
2376 send_array(handle.array_sender(), make_test_array(77));
2377
2378 let r1 = rx1.blocking_recv().unwrap();
2380 let r2 = rx2.blocking_recv().unwrap();
2381 assert_eq!(r1.unique_id, 77);
2382 assert_eq!(r2.unique_id, 77);
2383 }
2384
2385 #[test]
2386 fn test_param_updates_after_send() {
2387 let pool = Arc::new(NDArrayPool::new(1_000_000));
2388
2389 struct ParamTracker;
2390 impl NDPluginProcess for ParamTracker {
2391 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
2392 ProcessResult::arrays(vec![Arc::new(array.clone())])
2393 }
2394 fn plugin_type(&self) -> &str {
2395 "ParamTracker"
2396 }
2397 }
2398
2399 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2400 let mut output = NDArrayOutput::new();
2401 output.add(downstream_sender);
2402
2403 let (handle, _data_jh) = create_plugin_runtime_with_output(
2404 "PARAM_TEST",
2405 ParamTracker,
2406 pool,
2407 10,
2408 output,
2409 "",
2410 test_wiring(),
2411 );
2412 enable_callbacks(&handle);
2413
2414 send_array(handle.array_sender(), make_test_array(1));
2416 let received = downstream_rx.blocking_recv().unwrap();
2417 assert_eq!(received.unique_id, 1);
2418
2419 handle
2421 .port_runtime()
2422 .port_handle()
2423 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
2424 .unwrap();
2425 std::thread::sleep(std::time::Duration::from_millis(50));
2426
2427 send_array(handle.array_sender(), make_test_array(2));
2429 let received = downstream_rx.blocking_recv().unwrap();
2430 assert_eq!(received.unique_id, 2);
2431 }
2432
2433 #[test]
2434 fn test_sort_buffer_reorders_by_unique_id() {
2435 let mut buf = SortBuffer::new();
2436
2437 buf.insert(3, vec![make_test_array(3)], 10);
2439 buf.insert(1, vec![make_test_array(1)], 10);
2440 buf.insert(2, vec![make_test_array(2)], 10);
2441
2442 assert_eq!(buf.len(), 3);
2443
2444 let drained = buf.drain_all();
2445 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2446 assert_eq!(ids, vec![1, 2, 3], "should drain in sorted uniqueId order");
2447 assert_eq!(buf.len(), 0);
2448 assert_eq!(buf.prev_unique_id, 3);
2449 }
2450
2451 #[test]
2452 fn test_sort_buffer_drain_ready_contiguous() {
2453 let mut buf = SortBuffer::new();
2456 buf.note_emitted(0);
2459 buf.insert(1, vec![make_test_array(1)], 10);
2460 buf.insert(2, vec![make_test_array(2)], 10);
2461 buf.insert(5, vec![make_test_array(5)], 10); let drained = buf.drain_ready(100.0);
2465 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2466 assert_eq!(ids, vec![1, 2], "contiguous run released; id=5 held by gap");
2467 assert_eq!(buf.len(), 1);
2468 }
2469
2470 #[test]
2471 fn test_sort_buffer_drain_ready_deadline() {
2472 let mut buf = SortBuffer::new();
2474 buf.note_emitted(1); buf.insert(5, vec![make_test_array(5)], 10); std::thread::sleep(std::time::Duration::from_millis(30));
2477 let drained = buf.drain_ready(0.01);
2479 let ids: Vec<i32> = drained.iter().map(|(id, _)| *id).collect();
2480 assert_eq!(ids, vec![5], "stale head released via deadline");
2481 }
2482
2483 #[test]
2484 fn test_sort_buffer_detects_disordered_on_emit() {
2485 let mut buf = SortBuffer::new();
2487 buf.note_emitted(5); buf.note_emitted(3); assert_eq!(buf.disordered_arrays, 1);
2490 buf.note_emitted(4); assert_eq!(buf.disordered_arrays, 1);
2492 }
2493
2494 #[test]
2495 fn test_sort_buffer_drops_when_full() {
2496 let mut buf = SortBuffer::new();
2497
2498 assert!(buf.insert(1, vec![make_test_array(1)], 2));
2500 assert!(buf.insert(2, vec![make_test_array(2)], 2));
2501 assert!(!buf.insert(3, vec![make_test_array(3)], 2));
2502
2503 assert_eq!(buf.len(), 2);
2504 assert_eq!(buf.dropped_output_arrays, 1);
2505 }
2506
2507 #[test]
2508 fn test_sort_mode_runtime_integration() {
2509 let pool = Arc::new(NDArrayPool::new(1_000_000));
2510 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2511 let mut output = NDArrayOutput::new();
2512 output.add(downstream_sender);
2513
2514 let (handle, _data_jh) = create_plugin_runtime_with_output(
2515 "SORT_TEST",
2516 PassthroughProcessor,
2517 pool,
2518 10,
2519 output,
2520 "",
2521 test_wiring(),
2522 );
2523 enable_callbacks(&handle);
2524
2525 handle
2527 .port_runtime()
2528 .port_handle()
2529 .write_int32_blocking(handle.plugin_params.sort_size, 0, 10)
2530 .unwrap();
2531 handle
2532 .port_runtime()
2533 .port_handle()
2534 .write_float64_blocking(handle.plugin_params.sort_time, 0, 0.1)
2535 .unwrap();
2536 handle
2537 .port_runtime()
2538 .port_handle()
2539 .write_int32_blocking(handle.plugin_params.sort_mode, 0, 1)
2540 .unwrap();
2541 std::thread::sleep(std::time::Duration::from_millis(50));
2542
2543 send_array(handle.array_sender(), make_test_array(1));
2546 send_array(handle.array_sender(), make_test_array(2));
2547 send_array(handle.array_sender(), make_test_array(3));
2548
2549 let rt = tokio::runtime::Builder::new_current_thread()
2550 .enable_all()
2551 .build()
2552 .unwrap();
2553 let fast = rt.block_on(async {
2554 tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
2555 });
2556 assert!(
2557 fast.is_ok(),
2558 "in-order arrays must be emitted immediately, not buffered"
2559 );
2560 assert_eq!(fast.unwrap().unwrap().unique_id, 1);
2561 assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 2);
2562 assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 3);
2563
2564 send_array(handle.array_sender(), make_test_array(5));
2568 send_array(handle.array_sender(), make_test_array(4));
2569 std::thread::sleep(std::time::Duration::from_millis(50));
2570 assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 4);
2572 assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 5);
2573 }
2574
2575 #[test]
2576 fn test_throttle_drops_output_arrays() {
2577 let pool = Arc::new(NDArrayPool::new(1_000_000));
2580 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2581 let mut output = NDArrayOutput::new();
2582 output.add(downstream_sender);
2583
2584 let (handle, _data_jh) = create_plugin_runtime_with_output(
2585 "THROTTLE_TEST",
2586 PassthroughProcessor,
2587 pool,
2588 10,
2589 output,
2590 "",
2591 test_wiring(),
2592 );
2593 enable_callbacks(&handle);
2594
2595 handle
2598 .port_runtime()
2599 .port_handle()
2600 .write_float64_blocking(handle.plugin_params.max_byte_rate, 0, 8.0)
2601 .unwrap();
2602 std::thread::sleep(std::time::Duration::from_millis(20));
2603
2604 for id in 1..=5 {
2605 send_array(handle.array_sender(), make_test_array(id));
2606 }
2607 std::thread::sleep(std::time::Duration::from_millis(50));
2608
2609 let rt = tokio::runtime::Builder::new_current_thread()
2611 .enable_all()
2612 .build()
2613 .unwrap();
2614 let mut received = 0;
2615 while rt
2616 .block_on(async {
2617 tokio::time::timeout(std::time::Duration::from_millis(20), downstream_rx.recv())
2618 .await
2619 })
2620 .map(|o| o.is_some())
2621 .unwrap_or(false)
2622 {
2623 received += 1;
2624 }
2625 assert!(
2626 received < 5,
2627 "throttle must drop some arrays (got {received})"
2628 );
2629 assert!(received >= 1, "first array within budget must pass");
2630 }
2631
2632 #[test]
2633 fn test_process_plugin_reprocesses_last_input() {
2634 let pool = Arc::new(NDArrayPool::new(1_000_000));
2636 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2637 let mut output = NDArrayOutput::new();
2638 output.add(downstream_sender);
2639
2640 let (handle, _data_jh) = create_plugin_runtime_with_output(
2641 "PROCESS_PLUGIN_TEST",
2642 PassthroughProcessor,
2643 pool,
2644 10,
2645 output,
2646 "",
2647 test_wiring(),
2648 );
2649 enable_callbacks(&handle);
2650
2651 send_array(handle.array_sender(), make_test_array(7));
2652 assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 7);
2653
2654 handle
2656 .port_runtime()
2657 .port_handle()
2658 .write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
2659 .unwrap();
2660 let reprocessed = downstream_rx.blocking_recv().unwrap();
2661 assert_eq!(
2662 reprocessed.unique_id, 7,
2663 "ProcessPlugin re-emits last input"
2664 );
2665 }
2666
2667 #[test]
2668 fn test_min_callback_time_drop_counts() {
2669 let pool = Arc::new(NDArrayPool::new(1_000_000));
2673 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2674 let mut output = NDArrayOutput::new();
2675 output.add(downstream_sender);
2676
2677 let (handle, _data_jh) = create_plugin_runtime_with_output(
2678 "MIN_CB_TEST",
2679 PassthroughProcessor,
2680 pool,
2681 10,
2682 output,
2683 "",
2684 test_wiring(),
2685 );
2686 enable_callbacks(&handle);
2687
2688 handle
2690 .port_runtime()
2691 .port_handle()
2692 .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
2693 .unwrap();
2694 std::thread::sleep(std::time::Duration::from_millis(20));
2695
2696 send_array(handle.array_sender(), make_test_array(1));
2697 send_array(handle.array_sender(), make_test_array(2));
2698 std::thread::sleep(std::time::Duration::from_millis(50));
2699
2700 assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
2701 let rt = tokio::runtime::Builder::new_current_thread()
2702 .enable_all()
2703 .build()
2704 .unwrap();
2705 let second = rt.block_on(async {
2706 tokio::time::timeout(std::time::Duration::from_millis(50), downstream_rx.recv()).await
2707 });
2708 assert!(
2709 second.is_err(),
2710 "second array throttled out by MinCallbackTime"
2711 );
2712 }
2713
2714 #[test]
2715 fn test_process_plugin_skips_throttled_input() {
2716 let pool = Arc::new(NDArrayPool::new(1_000_000));
2721 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2722 let mut output = NDArrayOutput::new();
2723 output.add(downstream_sender);
2724
2725 let (handle, _data_jh) = create_plugin_runtime_with_output(
2726 "PROCESS_THROTTLE_TEST",
2727 PassthroughProcessor,
2728 pool,
2729 10,
2730 output,
2731 "",
2732 test_wiring(),
2733 );
2734 enable_callbacks(&handle);
2735
2736 handle
2738 .port_runtime()
2739 .port_handle()
2740 .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 10.0)
2741 .unwrap();
2742 std::thread::sleep(std::time::Duration::from_millis(20));
2743
2744 send_array(handle.array_sender(), make_test_array(1));
2745 send_array(handle.array_sender(), make_test_array(2));
2746 std::thread::sleep(std::time::Duration::from_millis(50));
2747
2748 assert_eq!(downstream_rx.blocking_recv().unwrap().unique_id, 1);
2750
2751 handle
2756 .port_runtime()
2757 .port_handle()
2758 .write_float64_blocking(handle.plugin_params.min_callback_time, 0, 0.0)
2759 .unwrap();
2760 std::thread::sleep(std::time::Duration::from_millis(20));
2761 handle
2762 .port_runtime()
2763 .port_handle()
2764 .write_int32_blocking(handle.plugin_params.process_plugin, 0, 1)
2765 .unwrap();
2766 let reprocessed = downstream_rx.blocking_recv().unwrap();
2767 assert_eq!(
2768 reprocessed.unique_id, 1,
2769 "ProcessPlugin must re-inject the last processed array (1), not the throttled array (2)"
2770 );
2771 }
2772
2773 #[test]
2774 fn test_g3_compressed_array_dropped_on_non_aware_plugin() {
2775 let pool = Arc::new(NDArrayPool::new(1_000_000));
2777 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
2778 let mut output = NDArrayOutput::new();
2779 output.add(downstream_sender);
2780
2781 let (handle, _data_jh) = create_plugin_runtime_with_output(
2782 "G3_TEST",
2783 PassthroughProcessor, pool,
2785 10,
2786 output,
2787 "",
2788 test_wiring(),
2789 );
2790 enable_callbacks(&handle);
2791
2792 let mut compressed = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
2794 compressed.unique_id = 1;
2795 compressed.codec = Some(crate::codec::Codec {
2796 name: crate::codec::CodecName::JPEG,
2797 compressed_size: 16,
2798 level: 0,
2799 shuffle: 0,
2800 compressor: 0,
2801 });
2802 send_array(handle.array_sender(), Arc::new(compressed));
2803
2804 send_array(handle.array_sender(), make_test_array(2));
2806
2807 let r = downstream_rx.blocking_recv().unwrap();
2808 assert_eq!(
2809 r.unique_id, 2,
2810 "compressed array dropped; only the raw array reaches downstream"
2811 );
2812 }
2813
2814 #[test]
2815 fn test_drop_on_full_increments_dropped_counter() {
2816 struct SlowProcessor;
2820 impl NDPluginProcess for SlowProcessor {
2821 fn process_array(&mut self, _a: &NDArray, _p: &NDArrayPool) -> ProcessResult {
2822 std::thread::sleep(std::time::Duration::from_millis(200));
2823 ProcessResult::empty()
2824 }
2825 fn plugin_type(&self) -> &str {
2826 "Slow"
2827 }
2828 }
2829 let pool = Arc::new(NDArrayPool::new(1_000_000));
2830
2831 let (downstream_handle, _ds_jh) =
2833 create_plugin_runtime("B1_DOWNSTREAM", SlowProcessor, pool, 1, "", test_wiring());
2834 enable_callbacks(&downstream_handle);
2835 let ds_sender = downstream_handle.array_sender().clone();
2836 let dropped = ds_sender.dropped_arrays_counter().clone();
2837
2838 send_array(&ds_sender, make_test_array(1));
2841 send_array(&ds_sender, make_test_array(2));
2842 send_array(&ds_sender, make_test_array(3));
2843 send_array(&ds_sender, make_test_array(4));
2844
2845 assert!(
2846 dropped.load(Ordering::Acquire) >= 1,
2847 "arrays dropped on a full queue must be counted (got {})",
2848 dropped.load(Ordering::Acquire)
2849 );
2850 }
2851
2852 #[test]
2853 fn test_cross_width_narrowing_array_read_truncates() {
2854 let mut out = [0i8; 1];
2863 let n = copy_ccast(&[300u16], &mut out);
2864 assert_eq!(n, 1);
2865 assert_eq!(out[0], 44, "(epicsInt8)(epicsUInt16)300 == 44 (low 8 bits)");
2866 let mut sat = [0i8; 1];
2868 copy_convert(&[300u16], &mut sat);
2869 assert_eq!(sat[0], 127, "f64 round-trip saturates — the wrong behavior");
2870
2871 let mut out2 = [0i8; 1];
2873 copy_ccast(&[0x1234_5678i32], &mut out2);
2874 assert_eq!(out2[0], 0x78);
2875
2876 let mut out3 = [0i8; 1];
2878 copy_ccast(&[-1i32], &mut out3);
2879 assert_eq!(out3[0], -1);
2880
2881 let mut out4 = [0i8; 1];
2883 copy_ccast(&[255u16], &mut out4);
2884 assert_eq!(out4[0], -1);
2885
2886 let mut out5 = [0i32; 1];
2888 copy_ccast(&[0x0000_0001_0000_002Ai64], &mut out5);
2889 assert_eq!(out5[0], 42);
2890
2891 let mut out6 = [0i16; 1];
2893 copy_ccast(&[70000u32], &mut out6);
2894 assert_eq!(out6[0], 4464);
2895
2896 let mut out7 = [0i8; 1];
2899 copy_ccast(&[255u8], &mut out7);
2900 assert_eq!(out7[0], -1);
2901
2902 let mut fout = [0i32; 1];
2908 copy_convert(&[42.9f64], &mut fout);
2909 assert_eq!(fout[0], 42, "f64 -> i32 truncates toward zero");
2910 }
2911}