1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread;
4
5use asyn_rs::error::AsynResult;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::runtime::config::RuntimeConfig;
8use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
9use asyn_rs::user::AsynUser;
10
11use asyn_rs::port_handle::PortHandle;
12
13use crate::ndarray::NDArray;
14use crate::ndarray_pool::NDArrayPool;
15use crate::params::ndarray_driver::NDArrayDriverParams;
16
17use super::channel::{
18 BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender, ndarray_channel,
19};
20use super::params::PluginBaseParams;
21use super::wiring::WiringRegistry;
22
23#[derive(Debug, Clone)]
25pub enum ParamChangeValue {
26 Int32(i32),
27 Float64(f64),
28 Octet(String),
29}
30
31impl ParamChangeValue {
32 pub fn as_i32(&self) -> i32 {
33 match self {
34 ParamChangeValue::Int32(v) => *v,
35 ParamChangeValue::Float64(v) => *v as i32,
36 ParamChangeValue::Octet(_) => 0,
37 }
38 }
39
40 pub fn as_f64(&self) -> f64 {
41 match self {
42 ParamChangeValue::Int32(v) => *v as f64,
43 ParamChangeValue::Float64(v) => *v,
44 ParamChangeValue::Octet(_) => 0.0,
45 }
46 }
47
48 pub fn as_string(&self) -> Option<&str> {
49 match self {
50 ParamChangeValue::Octet(s) => Some(s),
51 _ => None,
52 }
53 }
54}
55
56pub enum ParamUpdate {
58 Int32 {
59 reason: usize,
60 addr: i32,
61 value: i32,
62 },
63 Float64 {
64 reason: usize,
65 addr: i32,
66 value: f64,
67 },
68 Octet {
69 reason: usize,
70 addr: i32,
71 value: String,
72 },
73}
74
75impl ParamUpdate {
76 pub fn int32(reason: usize, value: i32) -> Self {
78 Self::Int32 {
79 reason,
80 addr: 0,
81 value,
82 }
83 }
84 pub fn float64(reason: usize, value: f64) -> Self {
86 Self::Float64 {
87 reason,
88 addr: 0,
89 value,
90 }
91 }
92 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
94 Self::Int32 {
95 reason,
96 addr,
97 value,
98 }
99 }
100 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
102 Self::Float64 {
103 reason,
104 addr,
105 value,
106 }
107 }
108}
109
110pub struct ProcessResult {
112 pub output_arrays: Vec<Arc<NDArray>>,
113 pub param_updates: Vec<ParamUpdate>,
114 pub scatter_index: Option<usize>,
116}
117
118impl ProcessResult {
119 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
121 Self {
122 output_arrays: vec![],
123 param_updates,
124 scatter_index: None,
125 }
126 }
127
128 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
130 Self {
131 output_arrays,
132 param_updates: vec![],
133 scatter_index: None,
134 }
135 }
136
137 pub fn empty() -> Self {
139 Self {
140 output_arrays: vec![],
141 param_updates: vec![],
142 scatter_index: None,
143 }
144 }
145
146 pub fn scatter(output_arrays: Vec<Arc<NDArray>>, index: usize) -> Self {
148 Self {
149 output_arrays,
150 param_updates: vec![],
151 scatter_index: Some(index),
152 }
153 }
154}
155
156pub struct ParamChangeResult {
158 pub output_arrays: Vec<Arc<NDArray>>,
159 pub param_updates: Vec<ParamUpdate>,
160}
161
162impl ParamChangeResult {
163 pub fn updates(param_updates: Vec<ParamUpdate>) -> Self {
164 Self {
165 output_arrays: vec![],
166 param_updates,
167 }
168 }
169
170 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
171 Self {
172 output_arrays,
173 param_updates: vec![],
174 }
175 }
176
177 pub fn combined(output_arrays: Vec<Arc<NDArray>>, param_updates: Vec<ParamUpdate>) -> Self {
178 Self {
179 output_arrays,
180 param_updates,
181 }
182 }
183
184 pub fn empty() -> Self {
185 Self {
186 output_arrays: vec![],
187 param_updates: vec![],
188 }
189 }
190}
191
192pub trait NDPluginProcess: Send + 'static {
194 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
196
197 fn plugin_type(&self) -> &str;
199
200 fn register_params(
202 &mut self,
203 _base: &mut PortDriverBase,
204 ) -> Result<(), asyn_rs::error::AsynError> {
205 Ok(())
206 }
207
208 fn on_param_change(
211 &mut self,
212 _reason: usize,
213 _params: &PluginParamSnapshot,
214 ) -> ParamChangeResult {
215 ParamChangeResult::empty()
216 }
217
218 fn array_data_handle(&self) -> Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>> {
222 None
223 }
224}
225
226pub struct PluginParamSnapshot {
228 pub enable_callbacks: bool,
229 pub reason: usize,
231 pub addr: i32,
233 pub value: ParamChangeValue,
235}
236
237struct SharedProcessorInner<P: NDPluginProcess> {
240 processor: P,
241 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
242 pool: Arc<NDArrayPool>,
243 ndarray_params: NDArrayDriverParams,
244 plugin_params: PluginBaseParams,
245 port_handle: PortHandle,
246 array_counter: i32,
247 std_array_data_param: Option<usize>,
249}
250
251impl<P: NDPluginProcess> SharedProcessorInner<P> {
252 fn process_and_publish(&mut self, array: &NDArray) {
253 let t0 = std::time::Instant::now();
254 let result = self.processor.process_array(array, &self.pool);
255 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
256 self.publish_result(
257 result.output_arrays,
258 result.param_updates,
259 result.scatter_index,
260 Some(array),
261 elapsed_ms,
262 );
263 }
264
265 fn publish_result(
266 &mut self,
267 output_arrays: Vec<Arc<NDArray>>,
268 param_updates: Vec<ParamUpdate>,
269 scatter_index: Option<usize>,
270 fallback_array: Option<&NDArray>,
271 elapsed_ms: f64,
272 ) {
273 let output = self.output.lock();
274 for out in &output_arrays {
275 if let Some(idx) = scatter_index {
276 output.publish_to(idx, out.clone());
277 } else {
278 output.publish(out.clone());
279 }
280 }
281 drop(output);
282
283 if let Some(report_arr) = output_arrays.first().map(|a| a.as_ref()).or(fallback_array) {
284 self.array_counter += 1;
285
286 if let Some(param) = self.std_array_data_param {
289 use crate::ndarray::NDDataBuffer;
290 use asyn_rs::param::ParamValue;
291 let value = match &report_arr.data {
292 NDDataBuffer::I8(v) => {
293 Some(ParamValue::Int8Array(std::sync::Arc::from(v.as_slice())))
294 }
295 NDDataBuffer::U8(v) => Some(ParamValue::Int8Array(std::sync::Arc::from(
296 v.iter().map(|&x| x as i8).collect::<Vec<_>>().as_slice(),
297 ))),
298 NDDataBuffer::I16(v) => {
299 Some(ParamValue::Int16Array(std::sync::Arc::from(v.as_slice())))
300 }
301 NDDataBuffer::U16(v) => Some(ParamValue::Int16Array(std::sync::Arc::from(
302 v.iter().map(|&x| x as i16).collect::<Vec<_>>().as_slice(),
303 ))),
304 NDDataBuffer::I32(v) => {
305 Some(ParamValue::Int32Array(std::sync::Arc::from(v.as_slice())))
306 }
307 NDDataBuffer::U32(v) => Some(ParamValue::Int32Array(std::sync::Arc::from(
308 v.iter().map(|&x| x as i32).collect::<Vec<_>>().as_slice(),
309 ))),
310 NDDataBuffer::I64(v) => {
311 Some(ParamValue::Int64Array(std::sync::Arc::from(v.as_slice())))
312 }
313 NDDataBuffer::U64(v) => Some(ParamValue::Int64Array(std::sync::Arc::from(
314 v.iter().map(|&x| x as i64).collect::<Vec<_>>().as_slice(),
315 ))),
316 NDDataBuffer::F32(v) => {
317 Some(ParamValue::Float32Array(std::sync::Arc::from(v.as_slice())))
318 }
319 NDDataBuffer::F64(v) => {
320 Some(ParamValue::Float64Array(std::sync::Arc::from(v.as_slice())))
321 }
322 };
323 if let Some(value) = value {
324 let ts = report_arr.timestamp.to_system_time();
325 self.port_handle
326 .interrupts()
327 .notify(asyn_rs::interrupt::InterruptValue {
328 reason: param,
329 addr: 0,
330 value,
331 timestamp: ts,
332 });
333 }
334 }
335
336 let info = report_arr.info();
337 let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
338 self.port_handle.write_int32_no_wait(
339 self.ndarray_params.array_counter,
340 0,
341 self.array_counter,
342 );
343 self.port_handle.write_int32_no_wait(
344 self.ndarray_params.unique_id,
345 0,
346 report_arr.unique_id,
347 );
348 self.port_handle.write_int32_no_wait(
349 self.ndarray_params.n_dimensions,
350 0,
351 report_arr.dims.len() as i32,
352 );
353 self.port_handle.write_int32_no_wait(
354 self.ndarray_params.array_size_x,
355 0,
356 info.x_size as i32,
357 );
358 self.port_handle.write_int32_no_wait(
359 self.ndarray_params.array_size_y,
360 0,
361 info.y_size as i32,
362 );
363 self.port_handle.write_int32_no_wait(
364 self.ndarray_params.array_size_z,
365 0,
366 info.color_size as i32,
367 );
368 self.port_handle.write_int32_no_wait(
369 self.ndarray_params.array_size,
370 0,
371 info.total_bytes as i32,
372 );
373 self.port_handle.write_int32_no_wait(
374 self.ndarray_params.data_type,
375 0,
376 report_arr.data.data_type() as i32,
377 );
378 self.port_handle
379 .write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
380
381 let ts_f64 = report_arr.timestamp.as_f64();
382 self.port_handle
383 .write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
384 self.port_handle.write_int32_no_wait(
385 self.ndarray_params.epics_ts_sec,
386 0,
387 report_arr.timestamp.sec as i32,
388 );
389 self.port_handle.write_int32_no_wait(
390 self.ndarray_params.epics_ts_nsec,
391 0,
392 report_arr.timestamp.nsec as i32,
393 );
394 }
395
396 self.port_handle
397 .write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
398
399 let mut extra_addrs: Vec<i32> = Vec::new();
401 for update in ¶m_updates {
402 match update {
403 ParamUpdate::Int32 {
404 reason,
405 addr,
406 value,
407 } => {
408 self.port_handle.write_int32_no_wait(*reason, *addr, *value);
409 if *addr != 0 && !extra_addrs.contains(addr) {
410 extra_addrs.push(*addr);
411 }
412 }
413 ParamUpdate::Float64 {
414 reason,
415 addr,
416 value,
417 } => {
418 self.port_handle
419 .write_float64_no_wait(*reason, *addr, *value);
420 if *addr != 0 && !extra_addrs.contains(addr) {
421 extra_addrs.push(*addr);
422 }
423 }
424 ParamUpdate::Octet {
425 reason,
426 addr,
427 value,
428 } => {
429 let data = value.as_bytes().to_vec();
430 let user = asyn_rs::user::AsynUser::new(*reason).with_addr(*addr);
431 self.port_handle
432 .submit_no_wait(asyn_rs::request::RequestOp::OctetWrite { data }, user);
433 }
434 }
435 }
436
437 self.port_handle.call_param_callbacks_no_wait(0);
438 for addr in extra_addrs {
439 self.port_handle.call_param_callbacks_no_wait(addr);
440 }
441 }
442}
443
444struct BlockingProcessorHandle<P: NDPluginProcess> {
447 inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
448}
449
450impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
451 fn process_and_publish(&self, array: &NDArray) {
452 self.inner.lock().process_and_publish(array);
453 }
454}
455
456#[allow(dead_code)]
458pub struct PluginPortDriver {
459 base: PortDriverBase,
460 ndarray_params: NDArrayDriverParams,
461 plugin_params: PluginBaseParams,
462 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
463 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
465 std_array_data_param: Option<usize>,
467}
468
469impl PluginPortDriver {
470 fn new<P: NDPluginProcess>(
471 port_name: &str,
472 plugin_type_name: &str,
473 queue_size: usize,
474 ndarray_port: &str,
475 max_addr: usize,
476 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
477 processor: &mut P,
478 array_data: Option<Arc<parking_lot::Mutex<Option<Arc<NDArray>>>>>,
479 ) -> AsynResult<Self> {
480 let mut base = PortDriverBase::new(
481 port_name,
482 max_addr,
483 PortFlags {
484 can_block: true,
485 ..Default::default()
486 },
487 );
488
489 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
490 let plugin_params = PluginBaseParams::create(&mut base)?;
491
492 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
494 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
495 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
496 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
497 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
498 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
499 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
500 base.set_int32_param(ndarray_params.write_file, 0, 0)?;
501 base.set_int32_param(ndarray_params.read_file, 0, 0)?;
502 base.set_int32_param(ndarray_params.capture, 0, 0)?;
503 base.set_int32_param(ndarray_params.file_write_status, 0, 0)?;
504 base.set_string_param(ndarray_params.file_write_message, 0, "".into())?;
505 base.set_string_param(ndarray_params.file_path, 0, "".into())?;
506 base.set_string_param(ndarray_params.file_name, 0, "".into())?;
507 base.set_int32_param(ndarray_params.file_number, 0, 0)?;
508 base.set_int32_param(ndarray_params.auto_increment, 0, 0)?;
509 base.set_string_param(ndarray_params.file_template, 0, "%s%s_%3.3d.dat".into())?;
510 base.set_string_param(ndarray_params.full_file_name, 0, "".into())?;
511 base.set_int32_param(ndarray_params.create_dir, 0, 0)?;
512 base.set_string_param(ndarray_params.temp_suffix, 0, "".into())?;
513
514 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
516 if !ndarray_port.is_empty() {
517 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
518 }
519
520 let std_array_data_param = if array_data.is_some() {
522 Some(base.create_param("STD_ARRAY_DATA", asyn_rs::param::ParamType::GenericPointer)?)
523 } else {
524 None
525 };
526
527 processor.register_params(&mut base)?;
529
530 Ok(Self {
531 base,
532 ndarray_params,
533 plugin_params,
534 param_change_tx,
535 array_data,
536 std_array_data_param,
537 })
538 }
539}
540
541fn copy_direct<T: Copy>(src: &[T], dst: &mut [T]) -> usize {
543 let n = src.len().min(dst.len());
544 dst[..n].copy_from_slice(&src[..n]);
545 n
546}
547
548fn copy_convert<S, D>(src: &[S], dst: &mut [D]) -> usize
550where
551 S: CastToF64 + Copy,
552 D: CastFromF64 + Copy,
553{
554 let n = src.len().min(dst.len());
555 for i in 0..n {
556 dst[i] = D::cast_from_f64(src[i].cast_to_f64());
557 }
558 n
559}
560
561trait CastToF64 {
563 fn cast_to_f64(self) -> f64;
564}
565
566impl CastToF64 for i8 {
567 fn cast_to_f64(self) -> f64 {
568 self as f64
569 }
570}
571impl CastToF64 for u8 {
572 fn cast_to_f64(self) -> f64 {
573 self as f64
574 }
575}
576impl CastToF64 for i16 {
577 fn cast_to_f64(self) -> f64 {
578 self as f64
579 }
580}
581impl CastToF64 for u16 {
582 fn cast_to_f64(self) -> f64 {
583 self as f64
584 }
585}
586impl CastToF64 for i32 {
587 fn cast_to_f64(self) -> f64 {
588 self as f64
589 }
590}
591impl CastToF64 for u32 {
592 fn cast_to_f64(self) -> f64 {
593 self as f64
594 }
595}
596impl CastToF64 for i64 {
597 fn cast_to_f64(self) -> f64 {
598 self as f64
599 }
600}
601impl CastToF64 for u64 {
602 fn cast_to_f64(self) -> f64 {
603 self as f64
604 }
605}
606impl CastToF64 for f32 {
607 fn cast_to_f64(self) -> f64 {
608 self as f64
609 }
610}
611impl CastToF64 for f64 {
612 fn cast_to_f64(self) -> f64 {
613 self
614 }
615}
616
617trait CastFromF64 {
619 fn cast_from_f64(v: f64) -> Self;
620}
621
622impl CastFromF64 for i8 {
623 fn cast_from_f64(v: f64) -> Self {
624 v as i8
625 }
626}
627impl CastFromF64 for i16 {
628 fn cast_from_f64(v: f64) -> Self {
629 v as i16
630 }
631}
632impl CastFromF64 for i32 {
633 fn cast_from_f64(v: f64) -> Self {
634 v as i32
635 }
636}
637impl CastFromF64 for f32 {
638 fn cast_from_f64(v: f64) -> Self {
639 v as f32
640 }
641}
642impl CastFromF64 for f64 {
643 fn cast_from_f64(v: f64) -> Self {
644 v
645 }
646}
647
648macro_rules! impl_read_array {
651 ($self:expr, $buf:expr, $direct_variant:ident, $( $variant:ident ),*) => {{
652 use crate::ndarray::NDDataBuffer;
653 let handle = match &$self.array_data {
654 Some(h) => h,
655 None => return Ok(0),
656 };
657 let guard = handle.lock();
658 let array = match &*guard {
659 Some(a) => a,
660 None => return Ok(0),
661 };
662 let n = match &array.data {
663 NDDataBuffer::$direct_variant(v) => copy_direct(v, $buf),
664 $( NDDataBuffer::$variant(v) => copy_convert(v, $buf), )*
665 };
666 Ok(n)
667 }};
668}
669
670impl PortDriver for PluginPortDriver {
671 fn base(&self) -> &PortDriverBase {
672 &self.base
673 }
674
675 fn base_mut(&mut self) -> &mut PortDriverBase {
676 &mut self.base
677 }
678
679 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
680 let reason = user.reason;
681 let addr = user.addr;
682 self.base.set_int32_param(reason, addr, value)?;
683 self.base.call_param_callbacks(addr)?;
684 let _ = self
685 .param_change_tx
686 .try_send((reason, addr, ParamChangeValue::Int32(value)));
687 Ok(())
688 }
689
690 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
691 let reason = user.reason;
692 let addr = user.addr;
693 self.base.set_float64_param(reason, addr, value)?;
694 self.base.call_param_callbacks(addr)?;
695 let _ = self
696 .param_change_tx
697 .try_send((reason, addr, ParamChangeValue::Float64(value)));
698 Ok(())
699 }
700
701 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
702 let reason = user.reason;
703 let addr = user.addr;
704 let s = String::from_utf8_lossy(data).into_owned();
705 self.base.set_string_param(reason, addr, s.clone())?;
706 self.base.call_param_callbacks(addr)?;
707 let _ = self
708 .param_change_tx
709 .try_send((reason, addr, ParamChangeValue::Octet(s)));
710 Ok(())
711 }
712
713 fn read_int8_array(&mut self, _user: &AsynUser, buf: &mut [i8]) -> AsynResult<usize> {
714 impl_read_array!(self, buf, I8, U8, I16, U16, I32, U32, I64, U64, F32, F64)
715 }
716
717 fn read_int16_array(&mut self, _user: &AsynUser, buf: &mut [i16]) -> AsynResult<usize> {
718 impl_read_array!(self, buf, I16, I8, U8, U16, I32, U32, I64, U64, F32, F64)
719 }
720
721 fn read_int32_array(&mut self, _user: &AsynUser, buf: &mut [i32]) -> AsynResult<usize> {
722 impl_read_array!(self, buf, I32, I8, U8, I16, U16, U32, I64, U64, F32, F64)
723 }
724
725 fn read_float32_array(&mut self, _user: &AsynUser, buf: &mut [f32]) -> AsynResult<usize> {
726 impl_read_array!(self, buf, F32, I8, U8, I16, U16, I32, U32, I64, U64, F64)
727 }
728
729 fn read_float64_array(&mut self, _user: &AsynUser, buf: &mut [f64]) -> AsynResult<usize> {
730 impl_read_array!(self, buf, F64, I8, U8, I16, U16, I32, U32, I64, U64, F32)
731 }
732}
733
734#[derive(Clone)]
736pub struct PluginRuntimeHandle {
737 port_runtime: PortRuntimeHandle,
738 array_sender: NDArraySender,
739 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
740 port_name: String,
741 pub ndarray_params: NDArrayDriverParams,
742 pub plugin_params: PluginBaseParams,
743}
744
745impl PluginRuntimeHandle {
746 pub fn port_runtime(&self) -> &PortRuntimeHandle {
747 &self.port_runtime
748 }
749
750 pub fn array_sender(&self) -> &NDArraySender {
751 &self.array_sender
752 }
753
754 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
755 &self.array_output
756 }
757
758 pub fn port_name(&self) -> &str {
759 &self.port_name
760 }
761}
762
763pub fn create_plugin_runtime<P: NDPluginProcess>(
770 port_name: &str,
771 processor: P,
772 pool: Arc<NDArrayPool>,
773 queue_size: usize,
774 ndarray_port: &str,
775 wiring: Arc<WiringRegistry>,
776) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
777 create_plugin_runtime_multi_addr(
778 port_name,
779 processor,
780 pool,
781 queue_size,
782 ndarray_port,
783 wiring,
784 1,
785 )
786}
787
788pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
792 port_name: &str,
793 mut processor: P,
794 pool: Arc<NDArrayPool>,
795 queue_size: usize,
796 ndarray_port: &str,
797 wiring: Arc<WiringRegistry>,
798 max_addr: usize,
799) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
800 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
802
803 let plugin_type_name = processor.plugin_type().to_string();
805 let array_data = processor.array_data_handle();
806
807 let driver = PluginPortDriver::new(
809 port_name,
810 &plugin_type_name,
811 queue_size,
812 ndarray_port,
813 max_addr,
814 param_tx,
815 &mut processor,
816 array_data,
817 )
818 .expect("failed to create plugin port driver");
819
820 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
821 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
822 let ndarray_params = driver.ndarray_params;
823 let plugin_params = driver.plugin_params;
824 let std_array_data_param = driver.std_array_data_param;
825
826 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
828
829 let port_handle = port_runtime.port_handle().clone();
831
832 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
834
835 let enabled = Arc::new(AtomicBool::new(false));
837 let blocking_mode = Arc::new(AtomicBool::new(false));
838
839 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
841 let array_output_for_handle = array_output.clone();
842 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
843 processor,
844 output: array_output,
845 pool,
846 ndarray_params,
847 plugin_params,
848 port_handle,
849 array_counter: 0,
850 std_array_data_param,
851 }));
852
853 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
855 inner: shared.clone(),
856 });
857
858 let data_enabled = enabled.clone();
859 let data_blocking = blocking_mode.clone();
860 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
861
862 let nd_array_port_reason = plugin_params.nd_array_port;
864 let sender_port_name = port_name.to_string();
865 let initial_upstream = ndarray_port.to_string();
866
867 let data_jh = thread::Builder::new()
869 .name(format!("plugin-data-{port_name}"))
870 .spawn(move || {
871 plugin_data_loop(
872 shared,
873 array_rx,
874 param_rx,
875 enable_callbacks_reason,
876 blocking_callbacks_reason,
877 data_enabled,
878 data_blocking,
879 nd_array_port_reason,
880 sender_port_name,
881 initial_upstream,
882 wiring,
883 );
884 })
885 .expect("failed to spawn plugin data thread");
886
887 let handle = PluginRuntimeHandle {
888 port_runtime,
889 array_sender,
890 array_output: array_output_for_handle,
891 port_name: port_name.to_string(),
892 ndarray_params,
893 plugin_params,
894 };
895
896 (handle, data_jh)
897}
898
899fn plugin_data_loop<P: NDPluginProcess>(
900 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
901 mut array_rx: NDArrayReceiver,
902 mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
903 enable_callbacks_reason: usize,
904 blocking_callbacks_reason: usize,
905 enabled: Arc<AtomicBool>,
906 blocking_mode: Arc<AtomicBool>,
907 nd_array_port_reason: usize,
908 sender_port_name: String,
909 initial_upstream: String,
910 wiring: Arc<WiringRegistry>,
911) {
912 let mut current_upstream = initial_upstream;
913 let rt = tokio::runtime::Builder::new_current_thread()
914 .enable_all()
915 .build()
916 .unwrap();
917 rt.block_on(async {
918 loop {
919 tokio::select! {
920 msg = array_rx.recv_msg() => {
921 match msg {
922 Some(msg) => {
923 if !blocking_mode.load(Ordering::Acquire) {
926 shared.lock().process_and_publish(&msg.array);
927 }
928 }
930 None => break,
931 }
932 }
933 param = param_rx.recv() => {
934 match param {
935 Some((reason, addr, value)) => {
936 if reason == enable_callbacks_reason {
937 enabled.store(value.as_i32() != 0, Ordering::Release);
938 }
939 if reason == blocking_callbacks_reason {
940 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
941 }
942 if reason == nd_array_port_reason {
944 if let Some(new_port) = value.as_string() {
945 if new_port != current_upstream {
946 let old = std::mem::replace(&mut current_upstream, new_port.to_string());
947 if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
948 eprintln!("NDArrayPort rewire failed: {e}");
949 current_upstream = old;
950 }
951 }
952 }
953 }
954 let snapshot = PluginParamSnapshot {
955 enable_callbacks: enabled.load(Ordering::Acquire),
956 reason,
957 addr,
958 value,
959 };
960 let mut guard = shared.lock();
961 let t0 = std::time::Instant::now();
962 let result = guard.processor.on_param_change(reason, &snapshot);
963 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
964 if !result.output_arrays.is_empty() || !result.param_updates.is_empty() {
965 guard.publish_result(result.output_arrays, result.param_updates, None, None, elapsed_ms);
966 }
967 drop(guard);
968 }
969 None => break,
970 }
971 }
972 }
973 }
974 });
975}
976
977pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
979 upstream.array_output().lock().add(downstream_sender);
980}
981
982pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
984 port_name: &str,
985 mut processor: P,
986 pool: Arc<NDArrayPool>,
987 queue_size: usize,
988 output: NDArrayOutput,
989 ndarray_port: &str,
990 wiring: Arc<WiringRegistry>,
991) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
992 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
993
994 let plugin_type_name = processor.plugin_type().to_string();
995 let array_data = processor.array_data_handle();
996 let driver = PluginPortDriver::new(
997 port_name,
998 &plugin_type_name,
999 queue_size,
1000 ndarray_port,
1001 1,
1002 param_tx,
1003 &mut processor,
1004 array_data,
1005 )
1006 .expect("failed to create plugin port driver");
1007
1008 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
1009 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
1010 let ndarray_params = driver.ndarray_params;
1011 let plugin_params = driver.plugin_params;
1012 let std_array_data_param = driver.std_array_data_param;
1013
1014 let (port_runtime, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
1015
1016 let port_handle = port_runtime.port_handle().clone();
1017
1018 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
1019
1020 let enabled = Arc::new(AtomicBool::new(false));
1021 let blocking_mode = Arc::new(AtomicBool::new(false));
1022
1023 let array_output = Arc::new(parking_lot::Mutex::new(output));
1024 let array_output_for_handle = array_output.clone();
1025 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
1026 processor,
1027 output: array_output,
1028 pool,
1029 ndarray_params,
1030 plugin_params,
1031 port_handle,
1032 array_counter: 0,
1033 std_array_data_param,
1034 }));
1035
1036 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
1037 inner: shared.clone(),
1038 });
1039
1040 let data_enabled = enabled.clone();
1041 let data_blocking = blocking_mode.clone();
1042 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
1043
1044 let nd_array_port_reason = plugin_params.nd_array_port;
1046 let sender_port_name = port_name.to_string();
1047 let initial_upstream = ndarray_port.to_string();
1048
1049 let data_jh = thread::Builder::new()
1050 .name(format!("plugin-data-{port_name}"))
1051 .spawn(move || {
1052 plugin_data_loop(
1053 shared,
1054 array_rx,
1055 param_rx,
1056 enable_callbacks_reason,
1057 blocking_callbacks_reason,
1058 data_enabled,
1059 data_blocking,
1060 nd_array_port_reason,
1061 sender_port_name,
1062 initial_upstream,
1063 wiring,
1064 );
1065 })
1066 .expect("failed to spawn plugin data thread");
1067
1068 let handle = PluginRuntimeHandle {
1069 port_runtime,
1070 array_sender,
1071 array_output: array_output_for_handle,
1072 port_name: port_name.to_string(),
1073 ndarray_params,
1074 plugin_params,
1075 };
1076
1077 (handle, data_jh)
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082 use super::*;
1083 use crate::ndarray::{NDDataType, NDDimension};
1084 use crate::plugin::channel::ndarray_channel;
1085
1086 struct PassthroughProcessor;
1088
1089 impl NDPluginProcess for PassthroughProcessor {
1090 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1091 ProcessResult::arrays(vec![Arc::new(array.clone())])
1092 }
1093 fn plugin_type(&self) -> &str {
1094 "Passthrough"
1095 }
1096 }
1097
1098 struct SinkProcessor {
1100 count: usize,
1101 }
1102
1103 impl NDPluginProcess for SinkProcessor {
1104 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1105 self.count += 1;
1106 ProcessResult::empty()
1107 }
1108 fn plugin_type(&self) -> &str {
1109 "Sink"
1110 }
1111 }
1112
1113 fn make_test_array(id: i32) -> Arc<NDArray> {
1114 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
1115 arr.unique_id = id;
1116 Arc::new(arr)
1117 }
1118
1119 fn test_wiring() -> Arc<WiringRegistry> {
1120 Arc::new(WiringRegistry::new())
1121 }
1122
1123 fn enable_callbacks(handle: &PluginRuntimeHandle) {
1125 handle
1126 .port_runtime()
1127 .port_handle()
1128 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1129 .unwrap();
1130 std::thread::sleep(std::time::Duration::from_millis(10));
1131 }
1132
1133 #[test]
1134 fn test_passthrough_runtime() {
1135 let pool = Arc::new(NDArrayPool::new(1_000_000));
1136
1137 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1139 let mut output = NDArrayOutput::new();
1140 output.add(downstream_sender);
1141
1142 let (handle, _data_jh) = create_plugin_runtime_with_output(
1143 "PASS1",
1144 PassthroughProcessor,
1145 pool,
1146 10,
1147 output,
1148 "",
1149 test_wiring(),
1150 );
1151 enable_callbacks(&handle);
1152
1153 handle.array_sender().send(make_test_array(42));
1155
1156 let received = downstream_rx.blocking_recv().unwrap();
1158 assert_eq!(received.unique_id, 42);
1159 }
1160
1161 #[test]
1162 fn test_sink_runtime() {
1163 let pool = Arc::new(NDArrayPool::new(1_000_000));
1164
1165 let (handle, _data_jh) = create_plugin_runtime(
1166 "SINK1",
1167 SinkProcessor { count: 0 },
1168 pool,
1169 10,
1170 "",
1171 test_wiring(),
1172 );
1173 enable_callbacks(&handle);
1174
1175 handle.array_sender().send(make_test_array(1));
1177 handle.array_sender().send(make_test_array(2));
1178
1179 std::thread::sleep(std::time::Duration::from_millis(50));
1181
1182 assert_eq!(handle.port_name(), "SINK1");
1184 }
1185
1186 #[test]
1187 fn test_plugin_type_param() {
1188 let pool = Arc::new(NDArrayPool::new(1_000_000));
1189
1190 let (handle, _data_jh) = create_plugin_runtime(
1191 "TYPE_TEST",
1192 PassthroughProcessor,
1193 pool,
1194 10,
1195 "",
1196 test_wiring(),
1197 );
1198
1199 assert_eq!(handle.port_name(), "TYPE_TEST");
1201 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
1202 }
1203
1204 #[test]
1205 fn test_shutdown_on_handle_drop() {
1206 let pool = Arc::new(NDArrayPool::new(1_000_000));
1207
1208 let (handle, data_jh) = create_plugin_runtime(
1209 "SHUTDOWN_TEST",
1210 PassthroughProcessor,
1211 pool,
1212 10,
1213 "",
1214 test_wiring(),
1215 );
1216
1217 let sender = handle.array_sender().clone();
1219 drop(handle);
1220 drop(sender);
1221
1222 let result = data_jh.join();
1224 assert!(result.is_ok());
1225 }
1226
1227 #[test]
1228 fn test_dropped_count_when_queue_full() {
1229 let pool = Arc::new(NDArrayPool::new(1_000_000));
1230
1231 struct SlowProcessor;
1233 impl NDPluginProcess for SlowProcessor {
1234 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1235 std::thread::sleep(std::time::Duration::from_millis(100));
1236 ProcessResult::empty()
1237 }
1238 fn plugin_type(&self) -> &str {
1239 "Slow"
1240 }
1241 }
1242
1243 let (handle, _data_jh) =
1244 create_plugin_runtime("DROP_TEST", SlowProcessor, pool, 1, "", test_wiring());
1245 enable_callbacks(&handle);
1246
1247 for i in 0..10 {
1249 handle.array_sender().send(make_test_array(i));
1250 }
1251
1252 assert!(handle.array_sender().dropped_count() > 0);
1254 }
1255
1256 #[test]
1257 fn test_blocking_callbacks_basic() {
1258 let pool = Arc::new(NDArrayPool::new(1_000_000));
1259 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1260 let mut output = NDArrayOutput::new();
1261 output.add(downstream_sender);
1262
1263 let (handle, _data_jh) = create_plugin_runtime_with_output(
1264 "BLOCK_TEST",
1265 PassthroughProcessor,
1266 pool,
1267 10,
1268 output,
1269 "",
1270 test_wiring(),
1271 );
1272 enable_callbacks(&handle);
1273
1274 handle
1276 .port_runtime()
1277 .port_handle()
1278 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1279 .unwrap();
1280 std::thread::sleep(std::time::Duration::from_millis(50));
1281
1282 handle.array_sender().send(make_test_array(42));
1284
1285 let received = downstream_rx.blocking_recv().unwrap();
1287 assert_eq!(received.unique_id, 42);
1288 }
1289
1290 #[test]
1291 fn test_blocking_to_nonblocking_switch() {
1292 let pool = Arc::new(NDArrayPool::new(1_000_000));
1293 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1294 let mut output = NDArrayOutput::new();
1295 output.add(downstream_sender);
1296
1297 let (handle, _data_jh) = create_plugin_runtime_with_output(
1298 "SWITCH_TEST",
1299 PassthroughProcessor,
1300 pool,
1301 10,
1302 output,
1303 "",
1304 test_wiring(),
1305 );
1306 enable_callbacks(&handle);
1307
1308 handle
1310 .port_runtime()
1311 .port_handle()
1312 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1313 .unwrap();
1314 std::thread::sleep(std::time::Duration::from_millis(50));
1315
1316 handle.array_sender().send(make_test_array(1));
1317 let received = downstream_rx.blocking_recv().unwrap();
1318 assert_eq!(received.unique_id, 1);
1319
1320 handle
1322 .port_runtime()
1323 .port_handle()
1324 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
1325 .unwrap();
1326 std::thread::sleep(std::time::Duration::from_millis(50));
1327
1328 handle.array_sender().send(make_test_array(2));
1330 let received = downstream_rx.blocking_recv().unwrap();
1331 assert_eq!(received.unique_id, 2);
1332 }
1333
1334 #[test]
1335 fn test_enable_callbacks_disables_processing() {
1336 let pool = Arc::new(NDArrayPool::new(1_000_000));
1337 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1338 let mut output = NDArrayOutput::new();
1339 output.add(downstream_sender);
1340
1341 let (handle, _data_jh) = create_plugin_runtime_with_output(
1342 "ENABLE_TEST",
1343 PassthroughProcessor,
1344 pool,
1345 10,
1346 output,
1347 "",
1348 test_wiring(),
1349 );
1350
1351 handle
1353 .port_runtime()
1354 .port_handle()
1355 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
1356 .unwrap();
1357 std::thread::sleep(std::time::Duration::from_millis(50));
1358
1359 handle.array_sender().send(make_test_array(99));
1361
1362 let rt = tokio::runtime::Builder::new_current_thread()
1364 .enable_all()
1365 .build()
1366 .unwrap();
1367 let result = rt.block_on(async {
1368 tokio::time::timeout(std::time::Duration::from_millis(100), downstream_rx.recv()).await
1369 });
1370 assert!(
1371 result.is_err(),
1372 "should not receive array when callbacks disabled"
1373 );
1374 }
1375
1376 #[test]
1377 fn test_blocking_downstream_receives() {
1378 let pool = Arc::new(NDArrayPool::new(1_000_000));
1379
1380 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
1381 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
1382 let mut output = NDArrayOutput::new();
1383 output.add(ds1);
1384 output.add(ds2);
1385
1386 let (handle, _data_jh) = create_plugin_runtime_with_output(
1387 "BLOCK_DS_TEST",
1388 PassthroughProcessor,
1389 pool,
1390 10,
1391 output,
1392 "",
1393 test_wiring(),
1394 );
1395 enable_callbacks(&handle);
1396
1397 handle
1399 .port_runtime()
1400 .port_handle()
1401 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1402 .unwrap();
1403 std::thread::sleep(std::time::Duration::from_millis(50));
1404
1405 handle.array_sender().send(make_test_array(77));
1406
1407 let r1 = rx1.blocking_recv().unwrap();
1409 let r2 = rx2.blocking_recv().unwrap();
1410 assert_eq!(r1.unique_id, 77);
1411 assert_eq!(r2.unique_id, 77);
1412 }
1413
1414 #[test]
1415 fn test_blocking_param_updates() {
1416 let pool = Arc::new(NDArrayPool::new(1_000_000));
1417
1418 struct ParamTracker;
1419 impl NDPluginProcess for ParamTracker {
1420 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
1421 ProcessResult::arrays(vec![Arc::new(array.clone())])
1422 }
1423 fn plugin_type(&self) -> &str {
1424 "ParamTracker"
1425 }
1426 }
1427
1428 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1429 let mut output = NDArrayOutput::new();
1430 output.add(downstream_sender);
1431
1432 let (handle, _data_jh) = create_plugin_runtime_with_output(
1433 "PARAM_TEST",
1434 ParamTracker,
1435 pool,
1436 10,
1437 output,
1438 "",
1439 test_wiring(),
1440 );
1441 enable_callbacks(&handle);
1442
1443 handle
1445 .port_runtime()
1446 .port_handle()
1447 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1448 .unwrap();
1449 std::thread::sleep(std::time::Duration::from_millis(50));
1450
1451 handle.array_sender().send(make_test_array(1));
1453 let received = downstream_rx.blocking_recv().unwrap();
1454 assert_eq!(received.unique_id, 1);
1455
1456 handle
1458 .port_runtime()
1459 .port_handle()
1460 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1461 .unwrap();
1462 std::thread::sleep(std::time::Duration::from_millis(50));
1463
1464 handle.array_sender().send(make_test_array(2));
1466 let received = downstream_rx.blocking_recv().unwrap();
1467 assert_eq!(received.unique_id, 2);
1468 }
1469
1470 #[test]
1472 fn test_no_panic_in_current_thread_runtime() {
1473 let pool = Arc::new(NDArrayPool::new(1_000_000));
1474 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1475 let mut output = NDArrayOutput::new();
1476 output.add(downstream_sender);
1477
1478 let (handle, _data_jh) = create_plugin_runtime_with_output(
1479 "CURRENT_THREAD_TEST",
1480 PassthroughProcessor,
1481 pool,
1482 10,
1483 output,
1484 "",
1485 test_wiring(),
1486 );
1487 enable_callbacks(&handle);
1488
1489 handle
1491 .port_runtime()
1492 .port_handle()
1493 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1494 .unwrap();
1495 std::thread::sleep(std::time::Duration::from_millis(50));
1496
1497 let rt = tokio::runtime::Builder::new_current_thread()
1499 .enable_all()
1500 .build()
1501 .unwrap();
1502 rt.block_on(async {
1503 handle.array_sender().send(make_test_array(99));
1504 });
1505
1506 let received = downstream_rx.blocking_recv().unwrap();
1507 assert_eq!(received.unique_id, 99);
1508 }
1509}