1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3use std::thread;
4
5use asyn_rs::error::AsynResult;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::runtime::config::RuntimeConfig;
8use asyn_rs::runtime::port::{create_port_runtime, PortRuntimeHandle};
9use asyn_rs::user::AsynUser;
10
11use asyn_rs::port_handle::PortHandle;
12
13use crate::ndarray::NDArray;
14use crate::ndarray_pool::NDArrayPool;
15use crate::params::ndarray_driver::NDArrayDriverParams;
16
17use super::channel::{ndarray_channel, BlockingProcessFn, NDArrayOutput, NDArrayReceiver, NDArraySender};
18use super::params::PluginBaseParams;
19use super::wiring::WiringRegistry;
20
21#[derive(Debug, Clone)]
23pub enum ParamChangeValue {
24 Int32(i32),
25 Float64(f64),
26 Octet(String),
27}
28
29impl ParamChangeValue {
30 pub fn as_i32(&self) -> i32 {
31 match self {
32 ParamChangeValue::Int32(v) => *v,
33 ParamChangeValue::Float64(v) => *v as i32,
34 ParamChangeValue::Octet(_) => 0,
35 }
36 }
37
38 pub fn as_f64(&self) -> f64 {
39 match self {
40 ParamChangeValue::Int32(v) => *v as f64,
41 ParamChangeValue::Float64(v) => *v,
42 ParamChangeValue::Octet(_) => 0.0,
43 }
44 }
45
46 pub fn as_string(&self) -> Option<&str> {
47 match self {
48 ParamChangeValue::Octet(s) => Some(s),
49 _ => None,
50 }
51 }
52}
53
54pub enum ParamUpdate {
56 Int32 { reason: usize, addr: i32, value: i32 },
57 Float64 { reason: usize, addr: i32, value: f64 },
58}
59
60impl ParamUpdate {
61 pub fn int32(reason: usize, value: i32) -> Self {
63 Self::Int32 { reason, addr: 0, value }
64 }
65 pub fn float64(reason: usize, value: f64) -> Self {
67 Self::Float64 { reason, addr: 0, value }
68 }
69 pub fn int32_addr(reason: usize, addr: i32, value: i32) -> Self {
71 Self::Int32 { reason, addr, value }
72 }
73 pub fn float64_addr(reason: usize, addr: i32, value: f64) -> Self {
75 Self::Float64 { reason, addr, value }
76 }
77}
78
79pub struct ProcessResult {
81 pub output_arrays: Vec<Arc<NDArray>>,
82 pub param_updates: Vec<ParamUpdate>,
83}
84
85impl ProcessResult {
86 pub fn sink(param_updates: Vec<ParamUpdate>) -> Self {
88 Self { output_arrays: vec![], param_updates }
89 }
90
91 pub fn arrays(output_arrays: Vec<Arc<NDArray>>) -> Self {
93 Self { output_arrays, param_updates: vec![] }
94 }
95
96 pub fn empty() -> Self {
98 Self { output_arrays: vec![], param_updates: vec![] }
99 }
100}
101
102pub trait NDPluginProcess: Send + 'static {
104 fn process_array(&mut self, array: &NDArray, pool: &NDArrayPool) -> ProcessResult;
106
107 fn plugin_type(&self) -> &str;
109
110 fn register_params(&mut self, _base: &mut PortDriverBase) -> Result<(), asyn_rs::error::AsynError> {
112 Ok(())
113 }
114
115 fn on_param_change(&mut self, _reason: usize, _params: &PluginParamSnapshot) {}
117}
118
119pub struct PluginParamSnapshot {
121 pub enable_callbacks: bool,
122 pub reason: usize,
124 pub addr: i32,
126 pub value: ParamChangeValue,
128}
129
130struct SharedProcessorInner<P: NDPluginProcess> {
133 processor: P,
134 output: Arc<parking_lot::Mutex<NDArrayOutput>>,
135 pool: Arc<NDArrayPool>,
136 ndarray_params: NDArrayDriverParams,
137 plugin_params: PluginBaseParams,
138 port_handle: PortHandle,
139 array_counter: i32,
140}
141
142impl<P: NDPluginProcess> SharedProcessorInner<P> {
143 fn process_and_publish(&mut self, array: &NDArray) {
144 let t0 = std::time::Instant::now();
145 let result = self.processor.process_array(array, &self.pool);
146 let elapsed_ms = t0.elapsed().as_secs_f64() * 1000.0;
147
148 let output = self.output.lock();
150 for out in &result.output_arrays {
151 output.publish(out.clone());
152 }
153 drop(output);
154
155 self.array_counter += 1;
159 let report_arr = result.output_arrays.first().map(|a| a.as_ref()).unwrap_or(array);
160 let info = report_arr.info();
161 let color_mode = if report_arr.dims.len() <= 2 { 0 } else { 2 };
162 self.port_handle.write_int32_no_wait(self.ndarray_params.array_counter, 0, self.array_counter);
163 self.port_handle.write_int32_no_wait(self.ndarray_params.unique_id, 0, report_arr.unique_id);
164 self.port_handle.write_int32_no_wait(self.ndarray_params.n_dimensions, 0, report_arr.dims.len() as i32);
165 self.port_handle.write_int32_no_wait(self.ndarray_params.array_size_x, 0, info.x_size as i32);
166 self.port_handle.write_int32_no_wait(self.ndarray_params.array_size_y, 0, info.y_size as i32);
167 self.port_handle.write_int32_no_wait(self.ndarray_params.array_size_z, 0, info.color_size as i32);
168 self.port_handle.write_int32_no_wait(self.ndarray_params.array_size, 0, info.total_bytes as i32);
169 self.port_handle.write_int32_no_wait(self.ndarray_params.data_type, 0, report_arr.data.data_type() as i32);
170 self.port_handle.write_int32_no_wait(self.ndarray_params.color_mode, 0, color_mode);
171
172 let ts_f64 = array.timestamp.as_f64();
173 self.port_handle.write_float64_no_wait(self.ndarray_params.timestamp_rbv, 0, ts_f64);
174 self.port_handle.write_int32_no_wait(self.ndarray_params.epics_ts_sec, 0, array.timestamp.sec as i32);
175 self.port_handle.write_int32_no_wait(self.ndarray_params.epics_ts_nsec, 0, array.timestamp.nsec as i32);
176
177 self.port_handle.write_float64_no_wait(self.plugin_params.execution_time, 0, elapsed_ms);
178
179 let mut extra_addrs: Vec<i32> = Vec::new();
181 for update in &result.param_updates {
182 match update {
183 ParamUpdate::Int32 { reason, addr, value } => {
184 self.port_handle.write_int32_no_wait(*reason, *addr, *value);
185 if *addr != 0 && !extra_addrs.contains(addr) {
186 extra_addrs.push(*addr);
187 }
188 }
189 ParamUpdate::Float64 { reason, addr, value } => {
190 self.port_handle.write_float64_no_wait(*reason, *addr, *value);
191 if *addr != 0 && !extra_addrs.contains(addr) {
192 extra_addrs.push(*addr);
193 }
194 }
195 }
196 }
197
198 self.port_handle.call_param_callbacks_no_wait(0);
199 for addr in extra_addrs {
200 self.port_handle.call_param_callbacks_no_wait(addr);
201 }
202 }
203}
204
205struct BlockingProcessorHandle<P: NDPluginProcess> {
208 inner: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
209}
210
211impl<P: NDPluginProcess> BlockingProcessFn for BlockingProcessorHandle<P> {
212 fn process_and_publish(&self, array: &NDArray) {
213 self.inner.lock().process_and_publish(array);
214 }
215}
216
217#[allow(dead_code)]
219pub struct PluginPortDriver {
220 base: PortDriverBase,
221 ndarray_params: NDArrayDriverParams,
222 plugin_params: PluginBaseParams,
223 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
224}
225
226impl PluginPortDriver {
227 fn new<P: NDPluginProcess>(
228 port_name: &str,
229 plugin_type_name: &str,
230 queue_size: usize,
231 ndarray_port: &str,
232 max_addr: usize,
233 param_change_tx: tokio::sync::mpsc::Sender<(usize, i32, ParamChangeValue)>,
234 processor: &mut P,
235 ) -> AsynResult<Self> {
236 let mut base = PortDriverBase::new(
237 port_name,
238 max_addr,
239 PortFlags {
240 can_block: true,
241 ..Default::default()
242 },
243 );
244
245 let ndarray_params = NDArrayDriverParams::create(&mut base)?;
246 let plugin_params = PluginBaseParams::create(&mut base)?;
247
248 base.set_int32_param(plugin_params.enable_callbacks, 0, 0)?;
250 base.set_int32_param(plugin_params.blocking_callbacks, 0, 0)?;
251 base.set_int32_param(plugin_params.queue_size, 0, queue_size as i32)?;
252 base.set_int32_param(plugin_params.dropped_arrays, 0, 0)?;
253 base.set_int32_param(plugin_params.queue_use, 0, 0)?;
254 base.set_string_param(plugin_params.plugin_type, 0, plugin_type_name.into())?;
255 base.set_int32_param(ndarray_params.array_callbacks, 0, 1)?;
256
257 base.set_string_param(ndarray_params.port_name_self, 0, port_name.into())?;
259 if !ndarray_port.is_empty() {
260 base.set_string_param(plugin_params.nd_array_port, 0, ndarray_port.into())?;
261 }
262
263 processor.register_params(&mut base)?;
265
266 Ok(Self {
267 base,
268 ndarray_params,
269 plugin_params,
270 param_change_tx,
271 })
272 }
273}
274
275impl PortDriver for PluginPortDriver {
276 fn base(&self) -> &PortDriverBase {
277 &self.base
278 }
279
280 fn base_mut(&mut self) -> &mut PortDriverBase {
281 &mut self.base
282 }
283
284 fn io_write_int32(&mut self, user: &mut AsynUser, value: i32) -> AsynResult<()> {
285 let reason = user.reason;
286 let addr = user.addr;
287 self.base.set_int32_param(reason, addr, value)?;
288 self.base.call_param_callbacks(addr)?;
289 let _ = self.param_change_tx.try_send((reason, addr, ParamChangeValue::Int32(value)));
290 Ok(())
291 }
292
293 fn io_write_float64(&mut self, user: &mut AsynUser, value: f64) -> AsynResult<()> {
294 let reason = user.reason;
295 let addr = user.addr;
296 self.base.set_float64_param(reason, addr, value)?;
297 self.base.call_param_callbacks(addr)?;
298 let _ = self.param_change_tx.try_send((reason, addr, ParamChangeValue::Float64(value)));
299 Ok(())
300 }
301
302 fn io_write_octet(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<()> {
303 let reason = user.reason;
304 let addr = user.addr;
305 let s = String::from_utf8_lossy(data).into_owned();
306 self.base.set_string_param(reason, addr, s.clone())?;
307 self.base.call_param_callbacks(addr)?;
308 let _ = self.param_change_tx.try_send((reason, addr, ParamChangeValue::Octet(s)));
309 Ok(())
310 }
311}
312
313#[derive(Clone)]
315pub struct PluginRuntimeHandle {
316 port_runtime: PortRuntimeHandle,
317 array_sender: NDArraySender,
318 array_output: Arc<parking_lot::Mutex<NDArrayOutput>>,
319 port_name: String,
320 pub ndarray_params: NDArrayDriverParams,
321 pub plugin_params: PluginBaseParams,
322}
323
324impl PluginRuntimeHandle {
325 pub fn port_runtime(&self) -> &PortRuntimeHandle {
326 &self.port_runtime
327 }
328
329 pub fn array_sender(&self) -> &NDArraySender {
330 &self.array_sender
331 }
332
333 pub fn array_output(&self) -> &Arc<parking_lot::Mutex<NDArrayOutput>> {
334 &self.array_output
335 }
336
337 pub fn port_name(&self) -> &str {
338 &self.port_name
339 }
340}
341
342pub fn create_plugin_runtime<P: NDPluginProcess>(
349 port_name: &str,
350 processor: P,
351 pool: Arc<NDArrayPool>,
352 queue_size: usize,
353 ndarray_port: &str,
354 wiring: Arc<WiringRegistry>,
355) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
356 create_plugin_runtime_multi_addr(port_name, processor, pool, queue_size, ndarray_port, wiring, 1)
357}
358
359pub fn create_plugin_runtime_multi_addr<P: NDPluginProcess>(
363 port_name: &str,
364 mut processor: P,
365 pool: Arc<NDArrayPool>,
366 queue_size: usize,
367 ndarray_port: &str,
368 wiring: Arc<WiringRegistry>,
369 max_addr: usize,
370) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
371 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
373
374 let plugin_type_name = processor.plugin_type().to_string();
376
377 let driver = PluginPortDriver::new(port_name, &plugin_type_name, queue_size, ndarray_port, max_addr, param_tx, &mut processor)
379 .expect("failed to create plugin port driver");
380
381 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
382 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
383 let ndarray_params = driver.ndarray_params;
384 let plugin_params = driver.plugin_params;
385
386 let (port_runtime, _actor_jh) =
388 create_port_runtime(driver, RuntimeConfig::default());
389
390 let port_handle = port_runtime.port_handle().clone();
392
393 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
395
396 let enabled = Arc::new(AtomicBool::new(false));
398 let blocking_mode = Arc::new(AtomicBool::new(false));
399
400 let array_output = Arc::new(parking_lot::Mutex::new(NDArrayOutput::new()));
402 let array_output_for_handle = array_output.clone();
403 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
404 processor,
405 output: array_output,
406 pool,
407 ndarray_params,
408 plugin_params,
409 port_handle,
410 array_counter: 0,
411 }));
412
413 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
415 inner: shared.clone(),
416 });
417
418 let data_enabled = enabled.clone();
419 let data_blocking = blocking_mode.clone();
420 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
421
422 let nd_array_port_reason = plugin_params.nd_array_port;
424 let sender_port_name = port_name.to_string();
425 let initial_upstream = ndarray_port.to_string();
426
427 let data_jh = thread::Builder::new()
429 .name(format!("plugin-data-{port_name}"))
430 .spawn(move || {
431 plugin_data_loop(
432 shared,
433 array_rx,
434 param_rx,
435 enable_callbacks_reason,
436 blocking_callbacks_reason,
437 data_enabled,
438 data_blocking,
439 nd_array_port_reason,
440 sender_port_name,
441 initial_upstream,
442 wiring,
443 );
444 })
445 .expect("failed to spawn plugin data thread");
446
447 let handle = PluginRuntimeHandle {
448 port_runtime,
449 array_sender,
450 array_output: array_output_for_handle,
451 port_name: port_name.to_string(),
452 ndarray_params,
453 plugin_params,
454 };
455
456 (handle, data_jh)
457}
458
459fn plugin_data_loop<P: NDPluginProcess>(
460 shared: Arc<parking_lot::Mutex<SharedProcessorInner<P>>>,
461 mut array_rx: NDArrayReceiver,
462 mut param_rx: tokio::sync::mpsc::Receiver<(usize, i32, ParamChangeValue)>,
463 enable_callbacks_reason: usize,
464 blocking_callbacks_reason: usize,
465 enabled: Arc<AtomicBool>,
466 blocking_mode: Arc<AtomicBool>,
467 nd_array_port_reason: usize,
468 sender_port_name: String,
469 initial_upstream: String,
470 wiring: Arc<WiringRegistry>,
471) {
472 let mut current_upstream = initial_upstream;
473 let rt = tokio::runtime::Builder::new_current_thread()
474 .enable_all()
475 .build()
476 .unwrap();
477 rt.block_on(async {
478 loop {
479 tokio::select! {
480 msg = array_rx.recv_msg() => {
481 match msg {
482 Some(msg) => {
483 if !blocking_mode.load(Ordering::Acquire) {
486 shared.lock().process_and_publish(&msg.array);
487 }
488 }
490 None => break,
491 }
492 }
493 param = param_rx.recv() => {
494 match param {
495 Some((reason, addr, value)) => {
496 if reason == enable_callbacks_reason {
497 enabled.store(value.as_i32() != 0, Ordering::Release);
498 }
499 if reason == blocking_callbacks_reason {
500 blocking_mode.store(value.as_i32() != 0, Ordering::Release);
501 }
502 if reason == nd_array_port_reason {
504 if let Some(new_port) = value.as_string() {
505 let old = std::mem::replace(&mut current_upstream, new_port.to_string());
506 if let Err(e) = wiring.rewire_by_name(&sender_port_name, &old, new_port) {
507 eprintln!("NDArrayPort rewire failed: {e}");
508 current_upstream = old;
510 }
511 }
512 }
513 let snapshot = PluginParamSnapshot {
514 enable_callbacks: enabled.load(Ordering::Acquire),
515 reason,
516 addr,
517 value,
518 };
519 shared.lock().processor.on_param_change(reason, &snapshot);
520 }
521 None => break,
522 }
523 }
524 }
525 }
526 });
527}
528
529pub fn wire_downstream(upstream: &PluginRuntimeHandle, downstream_sender: NDArraySender) {
531 upstream.array_output().lock().add(downstream_sender);
532}
533
534pub fn create_plugin_runtime_with_output<P: NDPluginProcess>(
536 port_name: &str,
537 mut processor: P,
538 pool: Arc<NDArrayPool>,
539 queue_size: usize,
540 output: NDArrayOutput,
541 ndarray_port: &str,
542 wiring: Arc<WiringRegistry>,
543) -> (PluginRuntimeHandle, thread::JoinHandle<()>) {
544 let (param_tx, param_rx) = tokio::sync::mpsc::channel::<(usize, i32, ParamChangeValue)>(64);
545
546 let plugin_type_name = processor.plugin_type().to_string();
547 let driver = PluginPortDriver::new(port_name, &plugin_type_name, queue_size, ndarray_port, 1, param_tx, &mut processor)
548 .expect("failed to create plugin port driver");
549
550 let enable_callbacks_reason = driver.plugin_params.enable_callbacks;
551 let blocking_callbacks_reason = driver.plugin_params.blocking_callbacks;
552 let ndarray_params = driver.ndarray_params;
553 let plugin_params = driver.plugin_params;
554
555 let (port_runtime, _actor_jh) =
556 create_port_runtime(driver, RuntimeConfig::default());
557
558 let port_handle = port_runtime.port_handle().clone();
559
560 let (array_sender, array_rx) = ndarray_channel(port_name, queue_size);
561
562 let enabled = Arc::new(AtomicBool::new(false));
563 let blocking_mode = Arc::new(AtomicBool::new(false));
564
565 let array_output = Arc::new(parking_lot::Mutex::new(output));
566 let array_output_for_handle = array_output.clone();
567 let shared = Arc::new(parking_lot::Mutex::new(SharedProcessorInner {
568 processor,
569 output: array_output,
570 pool,
571 ndarray_params,
572 plugin_params,
573 port_handle,
574 array_counter: 0,
575 }));
576
577 let bp: Arc<dyn BlockingProcessFn> = Arc::new(BlockingProcessorHandle {
578 inner: shared.clone(),
579 });
580
581 let data_enabled = enabled.clone();
582 let data_blocking = blocking_mode.clone();
583 let array_sender = array_sender.with_blocking_support(enabled, blocking_mode, bp);
584
585 let nd_array_port_reason = plugin_params.nd_array_port;
587 let sender_port_name = port_name.to_string();
588 let initial_upstream = ndarray_port.to_string();
589
590 let data_jh = thread::Builder::new()
591 .name(format!("plugin-data-{port_name}"))
592 .spawn(move || {
593 plugin_data_loop(
594 shared,
595 array_rx,
596 param_rx,
597 enable_callbacks_reason,
598 blocking_callbacks_reason,
599 data_enabled,
600 data_blocking,
601 nd_array_port_reason,
602 sender_port_name,
603 initial_upstream,
604 wiring,
605 );
606 })
607 .expect("failed to spawn plugin data thread");
608
609 let handle = PluginRuntimeHandle {
610 port_runtime,
611 array_sender,
612 array_output: array_output_for_handle,
613 port_name: port_name.to_string(),
614 ndarray_params,
615 plugin_params,
616 };
617
618 (handle, data_jh)
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624 use crate::ndarray::{NDDataType, NDDimension};
625 use crate::plugin::channel::ndarray_channel;
626
627 struct PassthroughProcessor;
629
630 impl NDPluginProcess for PassthroughProcessor {
631 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
632 ProcessResult::arrays(vec![Arc::new(array.clone())])
633 }
634 fn plugin_type(&self) -> &str {
635 "Passthrough"
636 }
637 }
638
639 struct SinkProcessor {
641 count: usize,
642 }
643
644 impl NDPluginProcess for SinkProcessor {
645 fn process_array(&mut self, _array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
646 self.count += 1;
647 ProcessResult::empty()
648 }
649 fn plugin_type(&self) -> &str {
650 "Sink"
651 }
652 }
653
654 fn make_test_array(id: i32) -> Arc<NDArray> {
655 let mut arr = NDArray::new(vec![NDDimension::new(4)], NDDataType::UInt8);
656 arr.unique_id = id;
657 Arc::new(arr)
658 }
659
660 fn test_wiring() -> Arc<WiringRegistry> {
661 Arc::new(WiringRegistry::new())
662 }
663
664 fn enable_callbacks(handle: &PluginRuntimeHandle) {
666 handle
667 .port_runtime()
668 .port_handle()
669 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
670 .unwrap();
671 std::thread::sleep(std::time::Duration::from_millis(10));
672 }
673
674 #[test]
675 fn test_passthrough_runtime() {
676 let pool = Arc::new(NDArrayPool::new(1_000_000));
677
678 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
680 let mut output = NDArrayOutput::new();
681 output.add(downstream_sender);
682
683 let (handle, _data_jh) = create_plugin_runtime_with_output(
684 "PASS1",
685 PassthroughProcessor,
686 pool,
687 10,
688 output,
689 "",
690 test_wiring(),
691 );
692 enable_callbacks(&handle);
693
694 handle.array_sender().send(make_test_array(42));
696
697 let received = downstream_rx.blocking_recv().unwrap();
699 assert_eq!(received.unique_id, 42);
700 }
701
702 #[test]
703 fn test_sink_runtime() {
704 let pool = Arc::new(NDArrayPool::new(1_000_000));
705
706 let (handle, _data_jh) = create_plugin_runtime(
707 "SINK1",
708 SinkProcessor { count: 0 },
709 pool,
710 10,
711 "",
712 test_wiring(),
713 );
714 enable_callbacks(&handle);
715
716 handle.array_sender().send(make_test_array(1));
718 handle.array_sender().send(make_test_array(2));
719
720 std::thread::sleep(std::time::Duration::from_millis(50));
722
723 assert_eq!(handle.port_name(), "SINK1");
725 }
726
727 #[test]
728 fn test_plugin_type_param() {
729 let pool = Arc::new(NDArrayPool::new(1_000_000));
730
731 let (handle, _data_jh) = create_plugin_runtime(
732 "TYPE_TEST",
733 PassthroughProcessor,
734 pool,
735 10,
736 "",
737 test_wiring(),
738 );
739
740 assert_eq!(handle.port_name(), "TYPE_TEST");
742 assert_eq!(handle.port_runtime().port_name(), "TYPE_TEST");
743 }
744
745 #[test]
746 fn test_shutdown_on_handle_drop() {
747 let pool = Arc::new(NDArrayPool::new(1_000_000));
748
749 let (handle, data_jh) = create_plugin_runtime(
750 "SHUTDOWN_TEST",
751 PassthroughProcessor,
752 pool,
753 10,
754 "",
755 test_wiring(),
756 );
757
758 let sender = handle.array_sender().clone();
760 drop(handle);
761 drop(sender);
762
763 let result = data_jh.join();
765 assert!(result.is_ok());
766 }
767
768 #[test]
769 fn test_dropped_count_when_queue_full() {
770 let pool = Arc::new(NDArrayPool::new(1_000_000));
771
772 struct SlowProcessor;
774 impl NDPluginProcess for SlowProcessor {
775 fn process_array(
776 &mut self,
777 _array: &NDArray,
778 _pool: &NDArrayPool,
779 ) -> ProcessResult {
780 std::thread::sleep(std::time::Duration::from_millis(100));
781 ProcessResult::empty()
782 }
783 fn plugin_type(&self) -> &str {
784 "Slow"
785 }
786 }
787
788 let (handle, _data_jh) = create_plugin_runtime(
789 "DROP_TEST",
790 SlowProcessor,
791 pool,
792 1,
793 "",
794 test_wiring(),
795 );
796 enable_callbacks(&handle);
797
798 for i in 0..10 {
800 handle.array_sender().send(make_test_array(i));
801 }
802
803 assert!(handle.array_sender().dropped_count() > 0);
805 }
806
807 #[test]
808 fn test_blocking_callbacks_basic() {
809 let pool = Arc::new(NDArrayPool::new(1_000_000));
810 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
811 let mut output = NDArrayOutput::new();
812 output.add(downstream_sender);
813
814 let (handle, _data_jh) = create_plugin_runtime_with_output(
815 "BLOCK_TEST",
816 PassthroughProcessor,
817 pool,
818 10,
819 output,
820 "",
821 test_wiring(),
822 );
823 enable_callbacks(&handle);
824
825 handle
827 .port_runtime()
828 .port_handle()
829 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
830 .unwrap();
831 std::thread::sleep(std::time::Duration::from_millis(50));
832
833 handle.array_sender().send(make_test_array(42));
835
836 let received = downstream_rx.blocking_recv().unwrap();
838 assert_eq!(received.unique_id, 42);
839 }
840
841 #[test]
842 fn test_blocking_to_nonblocking_switch() {
843 let pool = Arc::new(NDArrayPool::new(1_000_000));
844 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
845 let mut output = NDArrayOutput::new();
846 output.add(downstream_sender);
847
848 let (handle, _data_jh) = create_plugin_runtime_with_output(
849 "SWITCH_TEST",
850 PassthroughProcessor,
851 pool,
852 10,
853 output,
854 "",
855 test_wiring(),
856 );
857 enable_callbacks(&handle);
858
859 handle
861 .port_runtime()
862 .port_handle()
863 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
864 .unwrap();
865 std::thread::sleep(std::time::Duration::from_millis(50));
866
867 handle.array_sender().send(make_test_array(1));
868 let received = downstream_rx.blocking_recv().unwrap();
869 assert_eq!(received.unique_id, 1);
870
871 handle
873 .port_runtime()
874 .port_handle()
875 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 0)
876 .unwrap();
877 std::thread::sleep(std::time::Duration::from_millis(50));
878
879 handle.array_sender().send(make_test_array(2));
881 let received = downstream_rx.blocking_recv().unwrap();
882 assert_eq!(received.unique_id, 2);
883 }
884
885 #[test]
886 fn test_enable_callbacks_disables_processing() {
887 let pool = Arc::new(NDArrayPool::new(1_000_000));
888 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
889 let mut output = NDArrayOutput::new();
890 output.add(downstream_sender);
891
892 let (handle, _data_jh) = create_plugin_runtime_with_output(
893 "ENABLE_TEST",
894 PassthroughProcessor,
895 pool,
896 10,
897 output,
898 "",
899 test_wiring(),
900 );
901
902 handle
904 .port_runtime()
905 .port_handle()
906 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 0)
907 .unwrap();
908 std::thread::sleep(std::time::Duration::from_millis(50));
909
910 handle.array_sender().send(make_test_array(99));
912
913 let rt = tokio::runtime::Builder::new_current_thread()
915 .enable_all()
916 .build()
917 .unwrap();
918 let result = rt.block_on(async {
919 tokio::time::timeout(
920 std::time::Duration::from_millis(100),
921 downstream_rx.recv(),
922 )
923 .await
924 });
925 assert!(
926 result.is_err(),
927 "should not receive array when callbacks disabled"
928 );
929 }
930
931 #[test]
932 fn test_blocking_downstream_receives() {
933 let pool = Arc::new(NDArrayPool::new(1_000_000));
934
935 let (ds1, mut rx1) = ndarray_channel("DS1", 10);
936 let (ds2, mut rx2) = ndarray_channel("DS2", 10);
937 let mut output = NDArrayOutput::new();
938 output.add(ds1);
939 output.add(ds2);
940
941 let (handle, _data_jh) = create_plugin_runtime_with_output(
942 "BLOCK_DS_TEST",
943 PassthroughProcessor,
944 pool,
945 10,
946 output,
947 "",
948 test_wiring(),
949 );
950 enable_callbacks(&handle);
951
952 handle
954 .port_runtime()
955 .port_handle()
956 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
957 .unwrap();
958 std::thread::sleep(std::time::Duration::from_millis(50));
959
960 handle.array_sender().send(make_test_array(77));
961
962 let r1 = rx1.blocking_recv().unwrap();
964 let r2 = rx2.blocking_recv().unwrap();
965 assert_eq!(r1.unique_id, 77);
966 assert_eq!(r2.unique_id, 77);
967 }
968
969 #[test]
970 fn test_blocking_param_updates() {
971 let pool = Arc::new(NDArrayPool::new(1_000_000));
972
973 struct ParamTracker;
974 impl NDPluginProcess for ParamTracker {
975 fn process_array(&mut self, array: &NDArray, _pool: &NDArrayPool) -> ProcessResult {
976 ProcessResult::arrays(vec![Arc::new(array.clone())])
977 }
978 fn plugin_type(&self) -> &str {
979 "ParamTracker"
980 }
981 }
982
983 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
984 let mut output = NDArrayOutput::new();
985 output.add(downstream_sender);
986
987 let (handle, _data_jh) = create_plugin_runtime_with_output(
988 "PARAM_TEST",
989 ParamTracker,
990 pool,
991 10,
992 output,
993 "",
994 test_wiring(),
995 );
996 enable_callbacks(&handle);
997
998 handle
1000 .port_runtime()
1001 .port_handle()
1002 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1003 .unwrap();
1004 std::thread::sleep(std::time::Duration::from_millis(50));
1005
1006 handle.array_sender().send(make_test_array(1));
1008 let received = downstream_rx.blocking_recv().unwrap();
1009 assert_eq!(received.unique_id, 1);
1010
1011 handle
1013 .port_runtime()
1014 .port_handle()
1015 .write_int32_blocking(handle.plugin_params.enable_callbacks, 0, 1)
1016 .unwrap();
1017 std::thread::sleep(std::time::Duration::from_millis(50));
1018
1019 handle.array_sender().send(make_test_array(2));
1021 let received = downstream_rx.blocking_recv().unwrap();
1022 assert_eq!(received.unique_id, 2);
1023 }
1024
1025 #[test]
1027 fn test_no_panic_in_current_thread_runtime() {
1028 let pool = Arc::new(NDArrayPool::new(1_000_000));
1029 let (downstream_sender, mut downstream_rx) = ndarray_channel("DOWNSTREAM", 10);
1030 let mut output = NDArrayOutput::new();
1031 output.add(downstream_sender);
1032
1033 let (handle, _data_jh) = create_plugin_runtime_with_output(
1034 "CURRENT_THREAD_TEST",
1035 PassthroughProcessor,
1036 pool,
1037 10,
1038 output,
1039 "",
1040 test_wiring(),
1041 );
1042 enable_callbacks(&handle);
1043
1044 handle
1046 .port_runtime()
1047 .port_handle()
1048 .write_int32_blocking(handle.plugin_params.blocking_callbacks, 0, 1)
1049 .unwrap();
1050 std::thread::sleep(std::time::Duration::from_millis(50));
1051
1052 let rt = tokio::runtime::Builder::new_current_thread()
1054 .enable_all()
1055 .build()
1056 .unwrap();
1057 rt.block_on(async {
1058 handle.array_sender().send(make_test_array(99));
1059 });
1060
1061 let received = downstream_rx.blocking_recv().unwrap();
1062 assert_eq!(received.unique_id, 99);
1063 }
1064}