1use std::sync::Arc;
2use std::time::Instant;
3
4use asyn_rs::param::ParamType;
5use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
6use asyn_rs::runtime::config::RuntimeConfig;
7use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
8use asyn_rs::user::AsynUser;
9use parking_lot::Mutex;
10
11pub const NUM_STATS_TS_CHANNELS: usize = 23;
15
16pub const STATS_TS_CHANNEL_NAMES: [&str; NUM_STATS_TS_CHANNELS] = [
18 "TSMinValue",
19 "TSMinX",
20 "TSMinY",
21 "TSMaxValue",
22 "TSMaxX",
23 "TSMaxY",
24 "TSMeanValue",
25 "TSSigma",
26 "TSTotal",
27 "TSNet",
28 "TSCentroidTotal",
29 "TSCentroidX",
30 "TSCentroidY",
31 "TSSigmaX",
32 "TSSigmaY",
33 "TSSigmaXY",
34 "TSSkewX",
35 "TSSkewY",
36 "TSKurtosisX",
37 "TSKurtosisY",
38 "TSEccentricity",
39 "TSOrientation",
40 "TSTimestamp",
41];
42
43pub struct TimeSeriesData {
48 pub values: Vec<f64>,
49}
50
51pub type TimeSeriesSender = tokio::sync::mpsc::Sender<TimeSeriesData>;
53pub type TimeSeriesReceiver = tokio::sync::mpsc::Receiver<TimeSeriesData>;
55
56pub struct TsReceiverRegistry {
59 inner: std::sync::Mutex<std::collections::HashMap<String, (TimeSeriesReceiver, Vec<String>)>>,
60}
61
62impl TsReceiverRegistry {
63 pub fn new() -> Self {
64 Self {
65 inner: std::sync::Mutex::new(std::collections::HashMap::new()),
66 }
67 }
68
69 pub fn store(
71 &self,
72 upstream_port: &str,
73 receiver: TimeSeriesReceiver,
74 channel_names: Vec<String>,
75 ) {
76 let mut map = self.inner.lock().unwrap();
77 map.insert(upstream_port.to_string(), (receiver, channel_names));
78 }
79
80 pub fn take(&self, upstream_port: &str) -> Option<(TimeSeriesReceiver, Vec<String>)> {
82 let mut map = self.inner.lock().unwrap();
83 map.remove(upstream_port)
84 }
85}
86
87impl Default for TsReceiverRegistry {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum TimeSeriesMode {
96 OneShot,
97 RingBuffer,
98}
99
100pub struct TimeSeries {
102 pub num_points: usize,
103 pub mode: TimeSeriesMode,
104 buffer: Vec<f64>,
105 write_pos: usize,
106 count: usize,
107}
108
109impl TimeSeries {
110 pub fn new(num_points: usize, mode: TimeSeriesMode) -> Self {
111 Self {
112 num_points,
113 mode,
114 buffer: vec![0.0; num_points],
115 write_pos: 0,
116 count: 0,
117 }
118 }
119
120 pub fn add_value(&mut self, value: f64) {
122 match self.mode {
123 TimeSeriesMode::OneShot => {
124 if self.write_pos < self.num_points {
125 self.buffer[self.write_pos] = value;
126 self.write_pos += 1;
127 self.count = self.write_pos;
128 }
129 }
130 TimeSeriesMode::RingBuffer => {
131 self.buffer[self.write_pos % self.num_points] = value;
132 self.write_pos += 1;
133 self.count = self.count.max(self.write_pos.min(self.num_points));
134 }
135 }
136 }
137
138 pub fn values(&self) -> Vec<f64> {
140 match self.mode {
141 TimeSeriesMode::OneShot => self.buffer[..self.count].to_vec(),
142 TimeSeriesMode::RingBuffer => {
143 if self.write_pos <= self.num_points {
144 self.buffer[..self.count].to_vec()
145 } else {
146 let start = self.write_pos % self.num_points;
147 let mut result = Vec::with_capacity(self.num_points);
148 result.extend_from_slice(&self.buffer[start..]);
149 result.extend_from_slice(&self.buffer[..start]);
150 result
151 }
152 }
153 }
154 }
155
156 pub fn count(&self) -> usize {
157 self.count
158 }
159
160 pub fn reset(&mut self) {
161 self.buffer.fill(0.0);
162 self.write_pos = 0;
163 self.count = 0;
164 }
165
166 pub fn resize(&mut self, num_points: usize) {
168 self.num_points = num_points;
169 self.buffer = vec![0.0; num_points];
170 self.write_pos = 0;
171 self.count = 0;
172 }
173
174 pub fn set_mode(&mut self, mode: TimeSeriesMode) {
176 self.mode = mode;
177 self.reset();
178 }
179}
180
181pub struct TSParams {
185 pub ts_acquire: usize,
186 pub ts_read: usize,
187 pub ts_num_points: usize,
188 pub ts_current_point: usize,
189 pub ts_time_per_point: usize,
190 pub ts_averaging_time: usize,
191 pub ts_num_average: usize,
192 pub ts_elapsed_time: usize,
193 pub ts_acquire_mode: usize,
194 pub ts_time_axis: usize,
195 pub ts_channels: Vec<usize>,
197 pub channel_names: Vec<String>,
199}
200
201pub struct SharedTsState {
203 pub buffers: Vec<TimeSeries>,
204 pub acquiring: bool,
205 pub start_time: Option<Instant>,
206 pub num_points: usize,
207 pub mode: TimeSeriesMode,
208}
209
210impl SharedTsState {
211 fn new(num_channels: usize, num_points: usize) -> Self {
212 let buffers = (0..num_channels)
213 .map(|_| TimeSeries::new(num_points, TimeSeriesMode::OneShot))
214 .collect();
215 Self {
216 buffers,
217 acquiring: false,
218 start_time: None,
219 num_points,
220 mode: TimeSeriesMode::OneShot,
221 }
222 }
223}
224
225pub struct TimeSeriesPortDriver {
230 base: PortDriverBase,
231 params: TSParams,
232 shared: Arc<Mutex<SharedTsState>>,
233 num_channels: usize,
234}
235
236impl TimeSeriesPortDriver {
237 fn new(
238 port_name: &str,
239 channel_names: &[&str],
240 num_points: usize,
241 shared: Arc<Mutex<SharedTsState>>,
242 ) -> Self {
243 let num_channels = channel_names.len();
244 let mut base = PortDriverBase::new(
245 port_name,
246 1,
247 PortFlags {
248 multi_device: false,
249 can_block: false,
250 destructible: true,
251 },
252 );
253
254 let _ = ad_core_rs::params::ndarray_driver::NDArrayDriverParams::create(&mut base);
256 let _ = ad_core_rs::plugin::params::PluginBaseParams::create(&mut base);
257
258 let ts_acquire = base.create_param("TS_ACQUIRE", ParamType::Int32).unwrap();
260 let _ = base.set_int32_param(ts_acquire, 0, 0);
261 let ts_read = base.create_param("TS_READ", ParamType::Int32).unwrap();
262 let ts_num_points = base
263 .create_param("TS_NUM_POINTS", ParamType::Int32)
264 .unwrap();
265 let _ = base.set_int32_param(ts_num_points, 0, num_points as i32);
266 let ts_current_point = base
267 .create_param("TS_CURRENT_POINT", ParamType::Int32)
268 .unwrap();
269 let _ = base.set_int32_param(ts_current_point, 0, 0);
270 let ts_time_per_point = base
271 .create_param("TS_TIME_PER_POINT", ParamType::Float64)
272 .unwrap();
273 let ts_averaging_time = base
274 .create_param("TS_AVERAGING_TIME", ParamType::Float64)
275 .unwrap();
276 let ts_num_average = base
277 .create_param("TS_NUM_AVERAGE", ParamType::Int32)
278 .unwrap();
279 let _ = base.set_int32_param(ts_num_average, 0, 1);
280 let ts_elapsed_time = base
281 .create_param("TS_ELAPSED_TIME", ParamType::Float64)
282 .unwrap();
283 let ts_acquire_mode = base
284 .create_param("TS_ACQUIRE_MODE", ParamType::Int32)
285 .unwrap();
286 let _ = base.set_int32_param(ts_acquire_mode, 0, 0);
287 let ts_time_axis = base
288 .create_param("TS_TIME_AXIS", ParamType::Float64Array)
289 .unwrap();
290
291 let time_axis: Vec<f64> = (0..num_points).map(|i| i as f64).collect();
293 let _ = base.params.set_float64_array(ts_time_axis, 0, time_axis);
294
295 let mut ts_channels = Vec::with_capacity(num_channels);
297 for name in channel_names {
298 let param_name = format!("TS_CHAN_{name}");
299 let idx = base
300 .create_param(¶m_name, ParamType::Float64Array)
301 .unwrap();
302 let _ = base.params.set_float64_array(idx, 0, vec![0.0; num_points]);
303 ts_channels.push(idx);
304 }
305
306 let params = TSParams {
307 ts_acquire,
308 ts_read,
309 ts_num_points,
310 ts_current_point,
311 ts_time_per_point,
312 ts_averaging_time,
313 ts_num_average,
314 ts_elapsed_time,
315 ts_acquire_mode,
316 ts_time_axis,
317 ts_channels,
318 channel_names: channel_names.iter().map(|s| s.to_string()).collect(),
319 };
320
321 Self {
322 base,
323 params,
324 shared,
325 num_channels,
326 }
327 }
328
329 fn update_waveform_params(&mut self) {
331 let state = self.shared.lock();
332 let num_points = state.num_points;
333
334 for (i, buf) in state.buffers.iter().enumerate() {
336 let mut values = buf.values();
337 values.resize(num_points, 0.0);
338 let _ = self
339 .base
340 .params
341 .set_float64_array(self.params.ts_channels[i], 0, values);
342 }
343
344 let current_point = state.buffers[0].count();
346 let _ = self
347 .base
348 .set_int32_param(self.params.ts_current_point, 0, current_point as i32);
349
350 if let Some(start) = state.start_time {
352 let elapsed = start.elapsed().as_secs_f64();
353 let _ = self
354 .base
355 .set_float64_param(self.params.ts_elapsed_time, 0, elapsed);
356 }
357
358 let acquiring = state.acquiring;
360 drop(state);
361
362 let _ = self
363 .base
364 .set_int32_param(self.params.ts_acquire, 0, if acquiring { 1 } else { 0 });
365
366 let _ = self.base.call_param_callbacks(0);
368 }
369}
370
371impl PortDriver for TimeSeriesPortDriver {
372 fn base(&self) -> &PortDriverBase {
373 &self.base
374 }
375
376 fn base_mut(&mut self) -> &mut PortDriverBase {
377 &mut self.base
378 }
379
380 fn write_int32(&mut self, user: &mut AsynUser, value: i32) -> asyn_rs::error::AsynResult<()> {
381 let reason = user.reason;
382
383 if reason == self.params.ts_acquire {
384 let mut state = self.shared.lock();
385 if value != 0 {
386 if !state.acquiring {
388 if state.buffers[0].count() == 0 {
390 for buf in state.buffers.iter_mut() {
391 buf.reset();
392 }
393 }
394 state.acquiring = true;
395 state.start_time = Some(Instant::now());
396 }
397 } else {
398 state.acquiring = false;
400 }
401 drop(state);
402 self.base.set_int32_param(reason, 0, value)?;
403 self.base.call_param_callbacks(0)?;
404 } else if reason == self.params.ts_read {
405 self.update_waveform_params();
407 } else if reason == self.params.ts_num_points {
408 let new_size = value.max(1) as usize;
409 let mut state = self.shared.lock();
410 state.num_points = new_size;
411 for buf in state.buffers.iter_mut() {
412 buf.resize(new_size);
413 }
414 state.acquiring = false;
415 drop(state);
416
417 let time_axis: Vec<f64> = (0..new_size).map(|i| i as f64).collect();
419 let _ = self
420 .base
421 .params
422 .set_float64_array(self.params.ts_time_axis, 0, time_axis);
423
424 for i in 0..self.num_channels {
426 let _ = self.base.params.set_float64_array(
427 self.params.ts_channels[i],
428 0,
429 vec![0.0; new_size],
430 );
431 }
432
433 self.base.set_int32_param(reason, 0, value)?;
434 self.base
435 .set_int32_param(self.params.ts_current_point, 0, 0)?;
436 self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
437 self.base.call_param_callbacks(0)?;
438 } else if reason == self.params.ts_acquire_mode {
439 let mode = if value == 0 {
440 TimeSeriesMode::OneShot
441 } else {
442 TimeSeriesMode::RingBuffer
443 };
444 let mut state = self.shared.lock();
445 state.mode = mode;
446 for buf in state.buffers.iter_mut() {
447 buf.set_mode(mode);
448 }
449 state.acquiring = false;
450 drop(state);
451
452 self.base.set_int32_param(reason, 0, value)?;
453 self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
454 self.base.call_param_callbacks(0)?;
455 } else {
456 self.base.set_int32_param(reason, user.addr, value)?;
458 self.base.call_param_callbacks(user.addr)?;
459 }
460
461 Ok(())
462 }
463
464 fn read_float64_array(
465 &mut self,
466 user: &AsynUser,
467 buf: &mut [f64],
468 ) -> asyn_rs::error::AsynResult<usize> {
469 let data = self.base.params.get_float64_array(user.reason, user.addr)?;
470 let n = data.len().min(buf.len());
471 buf[..n].copy_from_slice(&data[..n]);
472 Ok(n)
473 }
474}
475
476fn ts_data_thread(shared: Arc<Mutex<SharedTsState>>, mut data_rx: TimeSeriesReceiver) {
478 while let Some(data) = data_rx.blocking_recv() {
479 let mut state = shared.lock();
480 if !state.acquiring {
481 continue;
482 }
483 let n = data.values.len().min(state.buffers.len());
484 for i in 0..n {
485 state.buffers[i].add_value(data.values[i]);
486 }
487 if state.mode == TimeSeriesMode::OneShot && state.buffers[0].count() >= state.num_points {
489 state.acquiring = false;
490 }
491 }
492}
493
494pub fn create_ts_port_runtime(
500 port_name: &str,
501 channel_names: &[&str],
502 num_points: usize,
503 data_rx: TimeSeriesReceiver,
504) -> (
505 PortRuntimeHandle,
506 TSParams,
507 std::thread::JoinHandle<()>,
508 std::thread::JoinHandle<()>,
509) {
510 let num_channels = channel_names.len();
511 let shared = Arc::new(Mutex::new(SharedTsState::new(num_channels, num_points)));
512
513 let driver = TimeSeriesPortDriver::new(port_name, channel_names, num_points, shared.clone());
514
515 let ts_params = TSParams {
517 ts_acquire: driver.params.ts_acquire,
518 ts_read: driver.params.ts_read,
519 ts_num_points: driver.params.ts_num_points,
520 ts_current_point: driver.params.ts_current_point,
521 ts_time_per_point: driver.params.ts_time_per_point,
522 ts_averaging_time: driver.params.ts_averaging_time,
523 ts_num_average: driver.params.ts_num_average,
524 ts_elapsed_time: driver.params.ts_elapsed_time,
525 ts_acquire_mode: driver.params.ts_acquire_mode,
526 ts_time_axis: driver.params.ts_time_axis,
527 ts_channels: driver.params.ts_channels.clone(),
528 channel_names: driver.params.channel_names.clone(),
529 };
530
531 let (runtime_handle, actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
532
533 let data_jh = std::thread::Builder::new()
535 .name(format!("ts-data-{port_name}"))
536 .spawn(move || {
537 ts_data_thread(shared, data_rx);
538 })
539 .expect("failed to spawn TS data thread");
540
541 (runtime_handle, ts_params, actor_jh, data_jh)
542}
543
544#[cfg(test)]
545mod tests {
546 use super::*;
547
548 #[test]
549 fn test_one_shot() {
550 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
551 for i in 0..5 {
552 ts.add_value(i as f64);
553 }
554 assert_eq!(ts.count(), 5);
555 assert_eq!(ts.values(), vec![0.0, 1.0, 2.0, 3.0, 4.0]);
556
557 ts.add_value(99.0);
559 assert_eq!(ts.count(), 5);
560 }
561
562 #[test]
563 fn test_ring_buffer() {
564 let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
565 for i in 0..6 {
566 ts.add_value(i as f64);
567 }
568 assert_eq!(ts.count(), 4);
569 assert_eq!(ts.values(), vec![2.0, 3.0, 4.0, 5.0]);
571 }
572
573 #[test]
574 fn test_ring_buffer_partial() {
575 let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
576 ts.add_value(10.0);
577 ts.add_value(20.0);
578 assert_eq!(ts.count(), 2);
579 assert_eq!(ts.values(), vec![10.0, 20.0]);
580 }
581
582 #[test]
583 fn test_reset() {
584 let mut ts = TimeSeries::new(3, TimeSeriesMode::OneShot);
585 ts.add_value(1.0);
586 ts.add_value(2.0);
587 ts.reset();
588 assert_eq!(ts.count(), 0);
589 assert!(ts.values().is_empty());
590 }
591
592 #[test]
593 fn test_resize() {
594 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
595 ts.add_value(1.0);
596 ts.add_value(2.0);
597 ts.resize(3);
598 assert_eq!(ts.num_points, 3);
599 assert_eq!(ts.count(), 0);
600 assert!(ts.values().is_empty());
601 }
602
603 #[test]
604 fn test_set_mode() {
605 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
606 ts.add_value(1.0);
607 ts.set_mode(TimeSeriesMode::RingBuffer);
608 assert_eq!(ts.mode, TimeSeriesMode::RingBuffer);
609 assert_eq!(ts.count(), 0);
610 }
611
612 const TEST_CHANNELS: [&str; 3] = ["ChA", "ChB", "ChC"];
615
616 #[test]
617 fn test_shared_ts_state_init() {
618 let state = SharedTsState::new(3, 100);
619 assert_eq!(state.buffers.len(), 3);
620 assert_eq!(state.num_points, 100);
621 assert!(!state.acquiring);
622 assert_eq!(state.mode, TimeSeriesMode::OneShot);
623 }
624
625 #[test]
626 fn test_ts_port_driver_create() {
627 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
628 let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared);
629 assert_eq!(driver.base().port_name, "TEST_TS");
630 assert_eq!(driver.num_channels, 3);
631 assert!(!driver.base().flags.multi_device);
632 }
633
634 #[test]
635 fn test_ts_port_driver_write_acquire() {
636 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
637 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
638
639 let mut user = AsynUser::new(driver.params.ts_acquire);
641 driver.write_int32(&mut user, 1).unwrap();
642 assert!(shared.lock().acquiring);
643
644 driver.write_int32(&mut user, 0).unwrap();
646 assert!(!shared.lock().acquiring);
647 }
648
649 #[test]
650 fn test_ts_port_driver_write_num_points() {
651 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
652 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
653
654 let mut user = AsynUser::new(driver.params.ts_num_points);
655 driver.write_int32(&mut user, 50).unwrap();
656
657 let state = shared.lock();
658 assert_eq!(state.num_points, 50);
659 for buf in &state.buffers {
660 assert_eq!(buf.num_points, 50);
661 }
662 }
663
664 #[test]
665 fn test_ts_port_driver_write_mode() {
666 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
667 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
668
669 let mut user = AsynUser::new(driver.params.ts_acquire_mode);
670 driver.write_int32(&mut user, 1).unwrap();
671
672 let state = shared.lock();
673 assert_eq!(state.mode, TimeSeriesMode::RingBuffer);
674 for buf in &state.buffers {
675 assert_eq!(buf.mode, TimeSeriesMode::RingBuffer);
676 }
677 }
678
679 #[test]
680 fn test_ts_port_driver_update_waveforms() {
681 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
682 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared.clone());
683
684 {
686 let mut state = shared.lock();
687 state.acquiring = true;
688 state.start_time = Some(Instant::now());
689 for buf in state.buffers.iter_mut() {
690 buf.add_value(42.0);
691 buf.add_value(43.0);
692 }
693 }
694
695 driver.update_waveform_params();
697
698 let cp = driver
700 .base
701 .get_int32_param(driver.params.ts_current_point, 0)
702 .unwrap();
703 assert_eq!(cp, 2);
704
705 let data = driver
707 .base
708 .params
709 .get_float64_array(driver.params.ts_channels[0], 0)
710 .unwrap();
711 assert_eq!(data[0], 42.0);
712 assert_eq!(data[1], 43.0);
713 }
714
715 #[test]
716 fn test_ts_port_driver_read_array() {
717 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 5)));
718 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 5, shared);
719
720 let user = AsynUser::new(driver.params.ts_time_axis);
721 let mut buf = vec![0.0; 5];
722 let n = driver.read_float64_array(&user, &mut buf).unwrap();
723 assert_eq!(n, 5);
724 assert_eq!(buf, vec![0.0, 1.0, 2.0, 3.0, 4.0]);
725 }
726
727 #[test]
728 fn test_ts_data_ingestion_oneshot() {
729 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 3)));
730 let (tx, rx) = tokio::sync::mpsc::channel(16);
731
732 shared.lock().acquiring = true;
734
735 let shared_clone = shared.clone();
736 let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
737
738 tx.blocking_send(TimeSeriesData {
740 values: vec![1.0, 10.0, 100.0],
741 })
742 .unwrap();
743 tx.blocking_send(TimeSeriesData {
744 values: vec![2.0, 20.0, 200.0],
745 })
746 .unwrap();
747 tx.blocking_send(TimeSeriesData {
748 values: vec![3.0, 30.0, 300.0],
749 })
750 .unwrap();
751 tx.blocking_send(TimeSeriesData {
752 values: vec![4.0, 40.0, 400.0],
753 })
754 .unwrap(); drop(tx);
758 jh.join().unwrap();
759
760 let state = shared.lock();
761 assert_eq!(state.buffers[0].count(), 3);
762 assert_eq!(state.buffers[0].values(), vec![1.0, 2.0, 3.0]);
763 assert_eq!(state.buffers[1].values(), vec![10.0, 20.0, 30.0]);
764 assert_eq!(state.buffers[2].values(), vec![100.0, 200.0, 300.0]);
765 assert!(!state.acquiring); }
767
768 #[test]
769 fn test_ts_data_ingestion_not_acquiring() {
770 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
771 let (tx, rx) = tokio::sync::mpsc::channel(16);
772
773 let shared_clone = shared.clone();
775 let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
776
777 tx.blocking_send(TimeSeriesData {
778 values: vec![1.0, 2.0, 3.0],
779 })
780 .unwrap();
781
782 drop(tx);
783 jh.join().unwrap();
784
785 let state = shared.lock();
786 assert_eq!(state.buffers[0].count(), 0);
787 }
788
789 #[test]
790 fn test_create_ts_port_runtime() {
791 let (_tx, rx) = tokio::sync::mpsc::channel(16);
792 let (handle, params, _actor_jh, _data_jh) =
793 create_ts_port_runtime("TEST_TS_RT", &TEST_CHANNELS, 100, rx);
794 assert_eq!(handle.port_name(), "TEST_TS_RT");
795 assert_eq!(params.ts_channels.len(), 3);
796 handle.shutdown();
797 }
798}