1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread;
4
5use asyn_rs::error::AsynResult;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::runtime::config::RuntimeConfig;
8use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
9use asyn_rs::user::AsynUser;
10
11use asyn_rs::port_handle::PortHandle;
12
13use crate::ndarray::NDArray;
14use crate::ndarray_pool::NDArrayPool;
15use crate::params::ndarray_driver::NDArrayDriverParams;
16
17use super::channel::{
18 BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
19};
20use super::params::PluginBaseParams;
21use super::wiring::WiringRegistry;
22
23#[derive(Debug, Clone)]
25pub enum ParamChangeValue {
26 Int32(i32),
27 Float64(f64),
28 Octet(String),
29}
30
31impl ParamChangeValue {
32 pub fn as_i32(&self) -> i32 {
33 match self {
34 ParamChangeValue::Int32(v) => *v,
35 ParamChangeValue::Float64(v) => *v as i32,
36 ParamChangeValue::Octet(_) => 0,
37 }
38 }
39
40 pub fn as_f64(&self) -> f64 {
41 match self {
42 ParamChangeValue::Int32(v) => *v as f64,
43 ParamChangeValue::Float64(v) => *v,
44 ParamChangeValue::Octet(_) => 0.0,
45 }
46 }
47
48 pub fn as_string(&self) -> Option<&str> {
49 match self {
50 ParamChangeValue::Octet(s) => Some(s),
51 _ => None,
52 }
53 }
54}
55
56pub enum ParamUpdate {
58 Int32 {
59 reason: usize,
60 addr: i32,
61 value: i32,
62 },
63 Float64 {
64 reason: usize,
65 addr: i32,
66 value: f64,
67 },
68 Octet {
69 reason: usize,
70 addr: i32,
71 value: String,
72 },
73}
74
75impl ParamUpdate {
76 pub fn int32(reason: usize, value: i32) -> Self {
78 Self::Int32 {
79 reason,
80 addr: 0,
81 value,
82 }
83 }
84 pub fn float64(reason: usize, value: f64) -> Self {
86 Self::Float64 {
87 reason,
88 addr: 0,
89 value,
90 }
91 }
92 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
94 Self::Int32 {
95 reason,
96 addr,
97 value,
98 }
99 }
100 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
102 Self::Float64 {
103 reason,
104 addr,
105 value,
106 }
107 }
108}
109
110pub struct ProcessResult {
112 pub output_arrays: Vec<Arc<NDArray>>,
113 pub param_updates: Vec<ParamUpdate>,
114 pub scatter_index: Option<usize>,
116}
117
118impl ProcessResult {
119 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
121 Self {
122 output_arrays: vec![],
123 param_updates,
124 scatter_index: None,
125 }
126 }
127
128 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
130 Self {
131 output_arrays,
132 param_updates: vec![],
133 scatter_index: None,
134 }
135 }
136
137 pub fn empty() -> Self {
139 Self {
140 output_arrays: vec![],
141 param_updates: vec![],
142 scatter_index: None,
143 }
144 }
145
146 pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
148 Self {
149 output_arrays,
150 param_updates: vec![],
151 scatter_index: Some(index),
152 }
153 }
154}
155
156pub struct ParamChangeResult {
158 pub output_arrays: Vec<Arc<NDArray>>,
159 pub param_updates: Vec<ParamUpdate>,
160}
161
162impl ParamChangeResult {
163 pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
164 Self {
165 output_arrays: vec![],
166 param_updates,
167 }
168 }
169
170 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
171 Self {
172 output_arrays,
173 param_updates: vec![],
174 }
175 }
176
177 pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
178 Self {
179 output_arrays,
180 param_updates,
181 }
182 }
183
184 pub fn empty() -> Self {
185 Self {
186 output_arrays: vec![],
187 param_updates: vec![],
188 }
189 }
190}
191
192pub trait NDPluginProcess: Send + 'static {
194 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
196
197 fn plugin_type(&self) -> &str;
199
200 fn register_params(
202 &mut self,
203 _base: &mut PortDriverBase,
204 ) -> Result<(), asyn_rs::error::AsynError> {
205 Ok(())
206 }
207
208 fn on_param_change(
211 &mut self,
212 _reason: usize,
213 _params: &PluginParamSnapshot,
214 ) -> ParamChangeResult {
215 ParamChangeResult::empty()
216 }
217
218 fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
222 None
223 }
224}
225
226pub struct PluginParamSnapshot {
228 pub enable_callbacks: bool,
229 pub reason: usize,
231 pub addr: i32,
233 pub value: ParamChangeValue,
235}
236
237struct SharedProcessorInner<P: NDPluginProcess> {
240 processor: P,
241 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
242 pool: Arc<NDArrayPool>,
243 ndarray_params: NDArrayDriverParams,
244 plugin_params: PluginBaseParams,
245 port_handle: PortHandle,
246 array_counter: i32,
247 std_array_data_param: Option<usize>,
249}
250
251impl<P: NDPluginProcess> SharedProcessorInner<P> {
252 fn process_and_publish(&mut self, array: &NDArray) {
253 let t0 = std::time::Instant::now();
254 let result = self.processor.process_array(array, &self.pool);
255 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
256 self.publish_result(
257 result.output_arrays,
258 result.param_updates,
259 result.scatter_index,
260 Some(array),
261 elapsed_ms,
262 );
263 }
264
265 fn publish_result(
266 &mut self,
267 output_arrays: Vec<Arc<NDArray>>,
268 param_updates: Vec<ParamUpdate>,
269 scatter_index: Option<usize>,
270 fallback_array: Option<&NDArray>,
271 elapsed_ms: f64,
272 ) {
273 let output = self.output.lock();
274 for out in &output_arrays {
275 if let Some(idx) = scatter_index {
276 output.publish_to(idx, out.clone());
277 } else {
278 output.publish(out.clone());
279 }
280 }
281 drop(output);
282
283 if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
284 self.array_counter += 1;
285
286 if let Some(param) = self.std_array_data_param {
289 use crate::ndarray::NDDataBuffer;
290 use asyn_rs::param::ParamValue;
291 let value = match &report_arr.data {
292 NDDataBuffer::I8(v) => {
293 Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
294 }
295 NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
296 v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
297 ))),
298 NDDataBuffer::I16(v) => {
299 Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
300 }
301 NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
302 v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
303 ))),
304 NDDataBuffer::I32(v) => {
305 Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
306 }
307 NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
308 v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
309 ))),
310 NDDataBuffer::I64(v) => {
311 Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
312 }
313 NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
314 v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
315 ))),
316 NDDataBuffer::F32(v) => {
317 Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
318 }
319 NDDataBuffer::F64(v) => {
320 Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
321 }
322 };
323 if let Some(value) = value {
324 let ts = report_arr.timestamp.to_system_time();
325 self.port_handle
326 .interrupts()
327 .notify(asyn_rs::interrupt::InterruptValue {
328 reason: param,
329 addr: 0,
330 value,
331 timestamp: ts,
332 });
333 }
334 }
335
336 let info = report_arr.info();
337 let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
338 self.port_handle.write_int32_no_wait(
339 self.ndarray_params.array_counter,
340 0,
341 self.array_counter,
342 );
343 self.port_handle.write_int32_no_wait(
344 self.ndarray_params.unique_id,
345 0,
346 report_arr.unique_id,
347 );
348 self.port_handle.write_int32_no_wait(
349 self.ndarray_params.n_dimensions,
350 0,
351 report_arr.dims.len() as i32,
352 );
353 self.port_handle.write_int32_no_wait(
354 self.ndarray_params.array_size_x,
355 0,
356 info.x_size as i32,
357 );
358 self.port_handle.write_int32_no_wait(
359 self.ndarray_params.array_size_y,
360 0,
361 info.y_size as i32,
362 );
363 self.port_handle.write_int32_no_wait(
364 self.ndarray_params.array_size_z,
365 0,
366 info.color_size as i32,
367 );
368 self.port_handle.write_int32_no_wait(
369 self.ndarray_params.array_size,
370 0,
371 info.total_bytes as i32,
372 );
373 self.port_handle.write_int32_no_wait(
374 self.ndarray_params.data_type,
375 0,
376 report_arr.data.data_type() as i32,
377 );
378 self.port_handle
379 .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
380
381 let ts_f64 = report_arr.timestamp.as_f64();
382 self.port_handle
383 .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
384 self.port_handle.write_int32_no_wait(
385 self.ndarray_params.epics_ts_sec,
386 0,
387 report_arr.timestamp.sec as i32,
388 );
389 self.port_handle.write_int32_no_wait(
390 self.ndarray_params.epics_ts_nsec,
391 0,
392 report_arr.timestamp.nsec as i32,
393 );
394 }
395
396 self.port_handle
397 .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
398
399 use asyn_rs::request::ParamSetValue;
402
403 let mut addr0_updates: Vec<ParamSetValue> = Vec::new();
404 let mut extra_addr_map: std::collections::HashMap<i32, Vec<ParamSetValue>> =
405 std::collections::HashMap::new();
406
407 for update in ¶m_updates {
408 match update {
409 ParamUpdate::Int32 {
410 reason,
411 addr,
412 value,
413 } => {
414 let pv = ParamSetValue::Int32 {
415 reason: *reason,
416 addr: *addr,
417 value: *value,
418 };
419 if *addr == 0 {
420 addr0_updates.push(pv);
421 } else {
422 extra_addr_map.entry(*addr).or_default().push(pv);
423 }
424 }
425 ParamUpdate::Float64 {
426 reason,
427 addr,
428 value,
429 } => {
430 let pv = ParamSetValue::Float64 {
431 reason: *reason,
432 addr: *addr,
433 value: *value,
434 };
435 if *addr == 0 {
436 addr0_updates.push(pv);
437 } else {
438 extra_addr_map.entry(*addr).or_default().push(pv);
439 }
440 }
441 ParamUpdate::Octet {
442 reason,
443 addr,
444 value,
445 } => {
446 let pv = ParamSetValue::Octet {
447 reason: *reason,
448 addr: *addr,
449 value: value.clone(),
450 };
451 if *addr == 0 {
452 addr0_updates.push(pv);
453 } else {
454 extra_addr_map.entry(*addr).or_default().push(pv);
455 }
456 }
457 }
458 }
459
460 self.port_handle.set_params_and_notify(0, addr0_updates);
461 for (addr, updates) in extra_addr_map {
462 self.port_handle.set_params_and_notify(addr, updates);
463 }
464 }
465}
466
467struct BlockingProcessorHandle<P: NDPluginProcess> {
470 inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
471}
472
473impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
474 fn process_and_publish(&self, array: &NDArray) {
475 self.inner.lock().process_and_publish(array);
476 }
477}
478
479#[allow(dead_code)]
481pub struct PluginPortDriver {
482 base: PortDriverBase,
483 ndarray_params: NDArrayDriverParams,
484 plugin_params: PluginBaseParams,
485 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
486 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
488 std_array_data_param: Option<usize>,
490}
491
492impl PluginPortDriver {
493 fn new<P: NDPluginProcess>(
494 port_name: &str,
495 plugin_type_name: &str,
496 queue_size: usize,
497 ndarray_port: &str,
498 max_addr: usize,
499 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
500 processor: &mut P,
501 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
502 ) -> AsynResult<Self> {
503 let mut base = PortDriverBase::new(
504 port_name,
505 max_addr,
506 PortFlags {
507 can_block: true,
508 ..Default::default()
509 },
510 );
511
512 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
513 let plugin_params = PluginBaseParams::create(&mut base)?;
514
515 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
517 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
518 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
519 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
520 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
521 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
522 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
523 base.set_int32_param(ndarray_params.write_file, 0, 0)?;
524 base.set_int32_param(ndarray_params.read_file, 0, 0)?;
525 base.set_int32_param(ndarray_params.capture, 0, 0)?;
526 base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
527 base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
528 base.set_string_param(ndarray_params.file_path, 0, "".into())?;
529 base.set_string_param(ndarray_params.file_name, 0, "".into())?;
530 base.set_int32_param(ndarray_params.file_number, 0, 0)?;
531 base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
532 base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
533 base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
534 base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
535 base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
536
537 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
539 if !ndarray_port.is_empty() {
540 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
541 }
542
543 let std_array_data_param = if array_data.is_some() {
545 Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
546 } else {
547 None
548 };
549
550 processor.register_params(&mut base)?;
552
553 Ok(Self {
554 base,
555 ndarray_params,
556 plugin_params,
557 param_change_tx,
558 array_data,
559 std_array_data_param,
560 })
561 }
562}
563
564fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
566 let n = src.len().min(dst.len());
567 dst[..n].copy_from_slice(&src[..n]);
568 n
569}
570
571fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
573where
574 S: CastToF64 + Copy,
575 D: CastFromF64 + Copy,
576{
577 let n = src.len().min(dst.len());
578 for i in 0..n {
579 dst[i] = D::cast_from_f64(src[i].cast_to_f64());
580 }
581 n
582}
583
584trait CastToF64 {
586 fn cast_to_f64(self) -> f64;
587}
588
589impl CastToF64 for i8 {
590 fn cast_to_f64(self) -> f64 {
591 self as f64
592 }
593}
594impl CastToF64 for u8 {
595 fn cast_to_f64(self) -> f64 {
596 self as f64
597 }
598}
599impl CastToF64 for i16 {
600 fn cast_to_f64(self) -> f64 {
601 self as f64
602 }
603}
604impl CastToF64 for u16 {
605 fn cast_to_f64(self) -> f64 {
606 self as f64
607 }
608}
609impl CastToF64 for i32 {
610 fn cast_to_f64(self) -> f64 {
611 self as f64
612 }
613}
614impl CastToF64 for u32 {
615 fn cast_to_f64(self) -> f64 {
616 self as f64
617 }
618}
619impl CastToF64 for i64 {
620 fn cast_to_f64(self) -> f64 {
621 self as f64
622 }
623}
624impl CastToF64 for u64 {
625 fn cast_to_f64(self) -> f64 {
626 self as f64
627 }
628}
629impl CastToF64 for f32 {
630 fn cast_to_f64(self) -> f64 {
631 self as f64
632 }
633}
634impl CastToF64 for f64 {
635 fn cast_to_f64(self) -> f64 {
636 self
637 }
638}
639
640trait CastFromF64 {
642 fn cast_from_f64(v: f64) -> Self;
643}
644
645impl CastFromF64 for i8 {
646 fn cast_from_f64(v: f64) -> Self {
647 v as i8
648 }
649}
650impl CastFromF64 for i16 {
651 fn cast_from_f64(v: f64) -> Self {
652 v as i16
653 }
654}
655impl CastFromF64 for i32 {
656 fn cast_from_f64(v: f64) -> Self {
657 v as i32
658 }
659}
660impl CastFromF64 for f32 {
661 fn cast_from_f64(v: f64) -> Self {
662 v as f32
663 }
664}
665impl CastFromF64 for f64 {
666 fn cast_from_f64(v: f64) -> Self {
667 v
668 }
669}
670
671macro_rules! impl_read_array {
674 ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
675 use crate::ndarray::NDDataBuffer;
676 let handle = match &$self.array_data {
677 Some(h) => h,
678 None => return Ok(0),
679 };
680 let guard = handle.lock();
681 let array = match &*guard {
682 Some(a) => a,
683 None => return Ok(0),
684 };
685 let n = match &array.data {
686 NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
687 $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
688 };
689 Ok(n)
690 }};
691}
692
693impl PortDriver for PluginPortDriver {
694 fn base(&self) -> &PortDriverBase {
695 &self.base
696 }
697
698 fn base_mut(&mut self) -> &mut PortDriverBase {
699 &mut self.base
700 }
701
702 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
703 let reason = user.reason;
704 let addr = user.addr;
705 self.base.set_int32_param(reason, addr, value)?;
706 self.base.call_param_callbacks(addr)?;
707 let _ = self
708 .param_change_tx
709 .try_send((reason, addr, ParamChangeValue::Int32(value)));
710 Ok(())
711 }
712
713 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
714 let reason = user.reason;
715 let addr = user.addr;
716 self.base.set_float64_param(reason, addr, value)?;
717 self.base.call_param_callbacks(addr)?;
718 let _ = self
719 .param_change_tx
720 .try_send((reason, addr, ParamChangeValue::Float64(value)));
721 Ok(())
722 }
723
724 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
725 let reason = user.reason;
726 let addr = user.addr;
727 let s = String::from_utf8_lossy(data).into_owned();
728 self.base.set_string_param(reason, addr, s.clone())?;
729 self.base.call_param_callbacks(addr)?;
730 let _ = self
731 .param_change_tx
732 .try_send((reason, addr, ParamChangeValue::Octet(s)));
733 Ok(())
734 }
735
736 fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
737 impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
738 }
739
740 fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
741 impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
742 }
743
744 fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
745 impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
746 }
747
748 fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
749 impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
750 }
751
752 fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
753 impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
754 }
755}
756
757#[derive(Clone)]
759pub struct PluginRuntimeHandle {
760 port_runtime: PortRuntimeHandle,
761 array_sender: NDArraySender,
762 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
763 port_name: String,
764 pub ndarray_params: NDArrayDriverParams,
765 pub plugin_params: PluginBaseParams,
766}
767
768impl PluginRuntimeHandle {
769 pub fn port_runtime(&self) -> &PortRuntimeHandle {
770 &self.port_runtime
771 }
772
773 pub fn array_sender(&self) -> &NDArraySender {
774 &self.array_sender
775 }
776
777 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
778 &self.array_output
779 }
780
781 pub fn port_name(&self) -> &str {
782 &self.port_name
783 }
784}
785
786pub fn create_plugin_runtime<P: NDPluginProcess>(
793 port_name: &str,
794 processor: P,
795 pool: Arc<NDArrayPool>,
796 queue_size: usize,
797 ndarray_port: &str,
798 wiring: Arc<WiringRegistry>,
799) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
800 create_plugin_runtime_multi_addr(
801 port_name,
802 processor,
803 pool,
804 queue_size,
805 ndarray_port,
806 wiring,
807 1,
808 )
809}
810
811pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
815 port_name: &str,
816 mut processor: P,
817 pool: Arc<NDArrayPool>,
818 queue_size: usize,
819 ndarray_port: &str,
820 wiring: Arc<WiringRegistry>,
821 max_addr: usize,
822) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
823 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
825
826 let plugin_type_name = processor.plugin_type().to_string();
828 let array_data = processor.array_data_handle();
829
830 let driver = PluginPortDriver::new(
832 port_name,
833 &plugin_type_name,
834 queue_size,
835 ndarray_port,
836 max_addr,
837 param_tx,
838 &mut processor,
839 array_data,
840 )
841 .expect("failed to create plugin port driver");
842
843 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
844 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
845 let ndarray_params = driver.ndarray_params;
846 let plugin_params = driver.plugin_params;
847 let std_array_data_param = driver.std_array_data_param;
848
849 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
851
852 let port_handle = port_runtime.port_handle().clone();
854
855 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
857
858 let enabled = Arc::new(AtomicBool::new(false));
860 let blocking_mode = Arc::new(AtomicBool::new(false));
861
862 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
864 let array_output_for_handle = array_output.clone();
865 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
866 processor,
867 output: array_output,
868 pool,
869 ndarray_params,
870 plugin_params,
871 port_handle,
872 array_counter: 0,
873 std_array_data_param,
874 }));
875
876 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
878 inner: shared.clone(),
879 });
880
881 let data_enabled = enabled.clone();
882 let data_blocking = blocking_mode.clone();
883 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
884
885 let nd_array_port_reason = plugin_params.nd_array_port;
887 let sender_port_name = port_name.to_string();
888 let initial_upstream = ndarray_port.to_string();
889
890 let data_jh = thread::Builder::new()
892 .name(format!("plugin-data-{port_name}"))
893 .spawn(move || {
894 plugin_data_loop(
895 shared,
896 array_rx,
897 param_rx,
898 enable_callbacks_reason,
899 blocking_callbacks_reason,
900 data_enabled,
901 data_blocking,
902 nd_array_port_reason,
903 sender_port_name,
904 initial_upstream,
905 wiring,
906 );
907 })
908 .expect("failed to spawn plugin data thread");
909
910 let handle = PluginRuntimeHandle {
911 port_runtime,
912 array_sender,
913 array_output: array_output_for_handle,
914 port_name: port_name.to_string(),
915 ndarray_params,
916 plugin_params,
917 };
918
919 (handle, data_jh)
920}
921
922fn plugin_data_loop<P: NDPluginProcess>(
923 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
924 mut array_rx: NDArrayReceiver,
925 mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
926 enable_callbacks_reason: usize,
927 blocking_callbacks_reason: usize,
928 enabled: Arc<AtomicBool>,
929 blocking_mode: Arc<AtomicBool>,
930 nd_array_port_reason: usize,
931 sender_port_name: String,
932 initial_upstream: String,
933 wiring: Arc<WiringRegistry>,
934) {
935 let mut current_upstream = initial_upstream;
936 let rt = tokio::runtime::Builder::new_current_thread()
937 .enable_all()
938 .build()
939 .unwrap();
940 rt.block_on(async {
941 loop {
942 tokio::select! {
943 msg = array_rx.recv_msg() => {
944 match msg {
945 Some(msg) => {
946 if !blocking_mode.load(Ordering::Acquire) {
949 shared.lock().process_and_publish(&msg.array);
950 }
951 }
953 None => break,
954 }
955 }
956 param = param_rx.recv() => {
957 match param {
958 Some((reason, addr, value)) => {
959 if reason == enable_callbacks_reason {
960 enabled.store(value.as_i32() != 0, Ordering::Release);
961 }
962 if reason == blocking_callbacks_reason {
963 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
964 }
965 if reason == nd_array_port_reason {
967 if let Some(new_port) = value.as_string() {
968 if new_port != current_upstream {
969 let old = std::mem::replace(&mut current_upstream, new_port.to_string());
970 if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
971 eprintln!("NDArrayPort rewire failed: {e}");
972 current_upstream = old;
973 }
974 }
975 }
976 }
977 let snapshot = PluginParamSnapshot {
978 enable_callbacks: enabled.load(Ordering::Acquire),
979 reason,
980 addr,
981 value,
982 };
983 let mut guard = shared.lock();
984 let t0 = std::time::Instant::now();
985 let result = guard.processor.on_param_change(reason, &snapshot);
986 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
987 if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
988 guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
989 }
990 drop(guard);
991 }
992 None => break,
993 }
994 }
995 }
996 }
997 });
998}
999
1000pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
1002 upstream.array_output().lock().add(downstream_sender);
1003}
1004
1005pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
1007 port_name: &str,
1008 mut processor: P,
1009 pool: Arc<NDArrayPool>,
1010 queue_size: usize,
1011 output: NDArrayOutput,
1012 ndarray_port: &str,
1013 wiring: Arc<WiringRegistry>,
1014) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
1015 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
1016
1017 let plugin_type_name = processor.plugin_type().to_string();
1018 let array_data = processor.array_data_handle();
1019 let driver = PluginPortDriver::new(
1020 port_name,
1021 &plugin_type_name,
1022 queue_size,
1023 ndarray_port,
1024 1,
1025 param_tx,
1026 &mut processor,
1027 array_data,
1028 )
1029 .expect("failed to create plugin port driver");
1030
1031 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1032 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1033 let ndarray_params = driver.ndarray_params;
1034 let plugin_params = driver.plugin_params;
1035 let std_array_data_param = driver.std_array_data_param;
1036
1037 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1038
1039 let port_handle = port_runtime.port_handle().clone();
1040
1041 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1042
1043 let enabled = Arc::new(AtomicBool::new(false));
1044 let blocking_mode = Arc::new(AtomicBool::new(false));
1045
1046 let array_output = Arc::new(parking_lot::Mutex::new(output));
1047 let array_output_for_handle = array_output.clone();
1048 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1049 processor,
1050 output: array_output,
1051 pool,
1052 ndarray_params,
1053 plugin_params,
1054 port_handle,
1055 array_counter: 0,
1056 std_array_data_param,
1057 }));
1058
1059 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1060 inner: shared.clone(),
1061 });
1062
1063 let data_enabled = enabled.clone();
1064 let data_blocking = blocking_mode.clone();
1065 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1066
1067 let nd_array_port_reason = plugin_params.nd_array_port;
1069 let sender_port_name = port_name.to_string();
1070 let initial_upstream = ndarray_port.to_string();
1071
1072 let data_jh = thread::Builder::new()
1073 .name(format!("plugin-data-{port_name}"))
1074 .spawn(move || {
1075 plugin_data_loop(
1076 shared,
1077 array_rx,
1078 param_rx,
1079 enable_callbacks_reason,
1080 blocking_callbacks_reason,
1081 data_enabled,
1082 data_blocking,
1083 nd_array_port_reason,
1084 sender_port_name,
1085 initial_upstream,
1086 wiring,
1087 );
1088 })
1089 .expect("failed to spawn plugin data thread");
1090
1091 let handle = PluginRuntimeHandle {
1092 port_runtime,
1093 array_sender,
1094 array_output: array_output_for_handle,
1095 port_name: port_name.to_string(),
1096 ndarray_params,
1097 plugin_params,
1098 };
1099
1100 (handle, data_jh)
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use super::*;
1106 use crate::ndarray::{NDDataType, NDDimension};
1107 use crate::plugin::channel::ndarray_channel;
1108
1109 struct PassthroughProcessor;
1111
1112 impl NDPluginProcess for PassthroughProcessor {
1113 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1114 ProcessResult::arrays(vec![Arc::new(array.clone())])
1115 }
1116 fn plugin_type(&self) -> &str {
1117 "Passthrough"
1118 }
1119 }
1120
1121 struct SinkProcessor {
1123 count: usize,
1124 }
1125
1126 impl NDPluginProcess for SinkProcessor {
1127 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1128 self.count += 1;
1129 ProcessResult::empty()
1130 }
1131 fn plugin_type(&self) -> &str {
1132 "Sink"
1133 }
1134 }
1135
1136 fn make_test_array(id: i32) -> Arc<NDArray> {
1137 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1138 arr.unique_id = id;
1139 Arc::new(arr)
1140 }
1141
1142 fn test_wiring() -> Arc<WiringRegistry> {
1143 Arc::new(WiringRegistry::new())
1144 }
1145
1146 fn enable_callbacks(handle: &PluginRuntimeHandle) {
1148 handle
1149 .port_runtime()
1150 .port_handle()
1151 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1152 .unwrap();
1153 std::thread::sleep(std::time::Duration::from_millis(10));
1154 }
1155
1156 #[test]
1157 fn test_passthrough_runtime() {
1158 let pool = Arc::new(NDArrayPool::new(1_000_000));
1159
1160 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1162 let mut output = NDArrayOutput::new();
1163 output.add(downstream_sender);
1164
1165 let (handle, _data_jh) = create_plugin_runtime_with_output(
1166 "PASS1",
1167 PassthroughProcessor,
1168 pool,
1169 10,
1170 output,
1171 "",
1172 test_wiring(),
1173 );
1174 enable_callbacks(&handle);
1175
1176 handle.array_sender().send(make_test_array(42));
1178
1179 let received = downstream_rx.blocking_recv().unwrap();
1181 assert_eq!(received.unique_id, 42);
1182 }
1183
1184 #[test]
1185 fn test_sink_runtime() {
1186 let pool = Arc::new(NDArrayPool::new(1_000_000));
1187
1188 let (handle, _data_jh) = create_plugin_runtime(
1189 "SINK1",
1190 SinkProcessor { count: 0 },
1191 pool,
1192 10,
1193 "",
1194 test_wiring(),
1195 );
1196 enable_callbacks(&handle);
1197
1198 handle.array_sender().send(make_test_array(1));
1200 handle.array_sender().send(make_test_array(2));
1201
1202 std::thread::sleep(std::time::Duration::from_millis(50));
1204
1205 assert_eq!(handle.port_name(), "SINK1");
1207 }
1208
1209 #[test]
1210 fn test_plugin_type_param() {
1211 let pool = Arc::new(NDArrayPool::new(1_000_000));
1212
1213 let (handle, _data_jh) = create_plugin_runtime(
1214 "TYPE_TEST",
1215 PassthroughProcessor,
1216 pool,
1217 10,
1218 "",
1219 test_wiring(),
1220 );
1221
1222 assert_eq!(handle.port_name(), "TYPE_TEST");
1224 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1225 }
1226
1227 #[test]
1228 fn test_shutdown_on_handle_drop() {
1229 let pool = Arc::new(NDArrayPool::new(1_000_000));
1230
1231 let (handle, data_jh) = create_plugin_runtime(
1232 "SHUTDOWN_TEST",
1233 PassthroughProcessor,
1234 pool,
1235 10,
1236 "",
1237 test_wiring(),
1238 );
1239
1240 let sender = handle.array_sender().clone();
1242 drop(handle);
1243 drop(sender);
1244
1245 let result = data_jh.join();
1247 assert!(result.is_ok());
1248 }
1249
1250 #[test]
1251 fn test_dropped_count_when_queue_full() {
1252 let pool = Arc::new(NDArrayPool::new(1_000_000));
1253
1254 struct SlowProcessor;
1256 impl NDPluginProcess for SlowProcessor {
1257 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1258 std::thread::sleep(std::time::Duration::from_millis(100));
1259 ProcessResult::empty()
1260 }
1261 fn plugin_type(&self) -> &str {
1262 "Slow"
1263 }
1264 }
1265
1266 let (handle, _data_jh) =
1267 create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1268 enable_callbacks(&handle);
1269
1270 for i in 0..10 {
1272 handle.array_sender().send(make_test_array(i));
1273 }
1274
1275 assert!(handle.array_sender().dropped_count() > 0);
1277 }
1278
1279 #[test]
1280 fn test_blocking_callbacks_basic() {
1281 let pool = Arc::new(NDArrayPool::new(1_000_000));
1282 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1283 let mut output = NDArrayOutput::new();
1284 output.add(downstream_sender);
1285
1286 let (handle, _data_jh) = create_plugin_runtime_with_output(
1287 "BLOCK_TEST",
1288 PassthroughProcessor,
1289 pool,
1290 10,
1291 output,
1292 "",
1293 test_wiring(),
1294 );
1295 enable_callbacks(&handle);
1296
1297 handle
1299 .port_runtime()
1300 .port_handle()
1301 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1302 .unwrap();
1303 std::thread::sleep(std::time::Duration::from_millis(50));
1304
1305 handle.array_sender().send(make_test_array(42));
1307
1308 let received = downstream_rx.blocking_recv().unwrap();
1310 assert_eq!(received.unique_id, 42);
1311 }
1312
1313 #[test]
1314 fn test_blocking_to_nonblocking_switch() {
1315 let pool = Arc::new(NDArrayPool::new(1_000_000));
1316 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1317 let mut output = NDArrayOutput::new();
1318 output.add(downstream_sender);
1319
1320 let (handle, _data_jh) = create_plugin_runtime_with_output(
1321 "SWITCH_TEST",
1322 PassthroughProcessor,
1323 pool,
1324 10,
1325 output,
1326 "",
1327 test_wiring(),
1328 );
1329 enable_callbacks(&handle);
1330
1331 handle
1333 .port_runtime()
1334 .port_handle()
1335 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1336 .unwrap();
1337 std::thread::sleep(std::time::Duration::from_millis(50));
1338
1339 handle.array_sender().send(make_test_array(1));
1340 let received = downstream_rx.blocking_recv().unwrap();
1341 assert_eq!(received.unique_id, 1);
1342
1343 handle
1345 .port_runtime()
1346 .port_handle()
1347 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1348 .unwrap();
1349 std::thread::sleep(std::time::Duration::from_millis(50));
1350
1351 handle.array_sender().send(make_test_array(2));
1353 let received = downstream_rx.blocking_recv().unwrap();
1354 assert_eq!(received.unique_id, 2);
1355 }
1356
1357 #[test]
1358 fn test_enable_callbacks_disables_processing() {
1359 let pool = Arc::new(NDArrayPool::new(1_000_000));
1360 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1361 let mut output = NDArrayOutput::new();
1362 output.add(downstream_sender);
1363
1364 let (handle, _data_jh) = create_plugin_runtime_with_output(
1365 "ENABLE_TEST",
1366 PassthroughProcessor,
1367 pool,
1368 10,
1369 output,
1370 "",
1371 test_wiring(),
1372 );
1373
1374 handle
1376 .port_runtime()
1377 .port_handle()
1378 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1379 .unwrap();
1380 std::thread::sleep(std::time::Duration::from_millis(50));
1381
1382 handle.array_sender().send(make_test_array(99));
1384
1385 let rt = tokio::runtime::Builder::new_current_thread()
1387 .enable_all()
1388 .build()
1389 .unwrap();
1390 let result = rt.block_on(async {
1391 tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1392 });
1393 assert!(
1394 result.is_err(),
1395 "should not receive array when callbacks disabled"
1396 );
1397 }
1398
1399 #[test]
1400 fn test_blocking_downstream_receives() {
1401 let pool = Arc::new(NDArrayPool::new(1_000_000));
1402
1403 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1404 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1405 let mut output = NDArrayOutput::new();
1406 output.add(ds1);
1407 output.add(ds2);
1408
1409 let (handle, _data_jh) = create_plugin_runtime_with_output(
1410 "BLOCK_DS_TEST",
1411 PassthroughProcessor,
1412 pool,
1413 10,
1414 output,
1415 "",
1416 test_wiring(),
1417 );
1418 enable_callbacks(&handle);
1419
1420 handle
1422 .port_runtime()
1423 .port_handle()
1424 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1425 .unwrap();
1426 std::thread::sleep(std::time::Duration::from_millis(50));
1427
1428 handle.array_sender().send(make_test_array(77));
1429
1430 let r1 = rx1.blocking_recv().unwrap();
1432 let r2 = rx2.blocking_recv().unwrap();
1433 assert_eq!(r1.unique_id, 77);
1434 assert_eq!(r2.unique_id, 77);
1435 }
1436
1437 #[test]
1438 fn test_blocking_param_updates() {
1439 let pool = Arc::new(NDArrayPool::new(1_000_000));
1440
1441 struct ParamTracker;
1442 impl NDPluginProcess for ParamTracker {
1443 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1444 ProcessResult::arrays(vec![Arc::new(array.clone())])
1445 }
1446 fn plugin_type(&self) -> &str {
1447 "ParamTracker"
1448 }
1449 }
1450
1451 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1452 let mut output = NDArrayOutput::new();
1453 output.add(downstream_sender);
1454
1455 let (handle, _data_jh) = create_plugin_runtime_with_output(
1456 "PARAM_TEST",
1457 ParamTracker,
1458 pool,
1459 10,
1460 output,
1461 "",
1462 test_wiring(),
1463 );
1464 enable_callbacks(&handle);
1465
1466 handle
1468 .port_runtime()
1469 .port_handle()
1470 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1471 .unwrap();
1472 std::thread::sleep(std::time::Duration::from_millis(50));
1473
1474 handle.array_sender().send(make_test_array(1));
1476 let received = downstream_rx.blocking_recv().unwrap();
1477 assert_eq!(received.unique_id, 1);
1478
1479 handle
1481 .port_runtime()
1482 .port_handle()
1483 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1484 .unwrap();
1485 std::thread::sleep(std::time::Duration::from_millis(50));
1486
1487 handle.array_sender().send(make_test_array(2));
1489 let received = downstream_rx.blocking_recv().unwrap();
1490 assert_eq!(received.unique_id, 2);
1491 }
1492
1493 #[test]
1495 fn test_no_panic_in_current_thread_runtime() {
1496 let pool = Arc::new(NDArrayPool::new(1_000_000));
1497 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1498 let mut output = NDArrayOutput::new();
1499 output.add(downstream_sender);
1500
1501 let (handle, _data_jh) = create_plugin_runtime_with_output(
1502 "CURRENT_THREAD_TEST",
1503 PassthroughProcessor,
1504 pool,
1505 10,
1506 output,
1507 "",
1508 test_wiring(),
1509 );
1510 enable_callbacks(&handle);
1511
1512 handle
1514 .port_runtime()
1515 .port_handle()
1516 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1517 .unwrap();
1518 std::thread::sleep(std::time::Duration::from_millis(50));
1519
1520 let rt = tokio::runtime::Builder::new_current_thread()
1522 .enable_all()
1523 .build()
1524 .unwrap();
1525 rt.block_on(async {
1526 handle.array_sender().send(make_test_array(99));
1527 });
1528
1529 let received = downstream_rx.blocking_recv().unwrap();
1530 assert_eq!(received.unique_id, 99);
1531 }
1532}