1use std::sync::Arc;
2use std::time::Instant;
3
4use asyn_rs::param::ParamType;
5use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
6use asyn_rs::runtime::config::RuntimeConfig;
7use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
8use asyn_rs::user::AsynUser;
9use parking_lot::Mutex;
10
11pub const NUM_STATS_TS_CHANNELS: usize = 23;
15
16pub const STATS_TS_CHANNEL_NAMES: [&str; NUM_STATS_TS_CHANNELS] = [
18 "TSMinValue",
19 "TSMinX",
20 "TSMinY",
21 "TSMaxValue",
22 "TSMaxX",
23 "TSMaxY",
24 "TSMeanValue",
25 "TSSigma",
26 "TSTotal",
27 "TSNet",
28 "TSCentroidTotal",
29 "TSCentroidX",
30 "TSCentroidY",
31 "TSSigmaX",
32 "TSSigmaY",
33 "TSSigmaXY",
34 "TSSkewX",
35 "TSSkewY",
36 "TSKurtosisX",
37 "TSKurtosisY",
38 "TSEccentricity",
39 "TSOrientation",
40 "TSTimestamp",
41];
42
43pub struct TimeSeriesData {
48 pub values: Vec<f64>,
49}
50
51pub type TimeSeriesSender = tokio::sync::mpsc::Sender<TimeSeriesData>;
53pub type TimeSeriesReceiver = tokio::sync::mpsc::Receiver<TimeSeriesData>;
55
56pub struct TsReceiverRegistry {
59 inner: std::sync::Mutex<std::collections::HashMap<String, (TimeSeriesReceiver, Vec<String>)>>,
60}
61
62impl TsReceiverRegistry {
63 pub fn new() -> Self {
64 Self {
65 inner: std::sync::Mutex::new(std::collections::HashMap::new()),
66 }
67 }
68
69 pub fn store(
71 &self,
72 upstream_port: &str,
73 receiver: TimeSeriesReceiver,
74 channel_names: Vec<String>,
75 ) {
76 let mut map = self.inner.lock().unwrap();
77 map.insert(upstream_port.to_string(), (receiver, channel_names));
78 }
79
80 pub fn take(&self, upstream_port: &str) -> Option<(TimeSeriesReceiver, Vec<String>)> {
82 let mut map = self.inner.lock().unwrap();
83 map.remove(upstream_port)
84 }
85}
86
87impl Default for TsReceiverRegistry {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum TimeSeriesMode {
96 OneShot,
97 RingBuffer,
98}
99
100pub struct TimeSeries {
102 pub num_points: usize,
103 pub mode: TimeSeriesMode,
104 buffer: Vec<f64>,
105 write_pos: usize,
106 count: usize,
107}
108
109impl TimeSeries {
110 pub fn new(num_points: usize, mode: TimeSeriesMode) -> Self {
111 Self {
112 num_points,
113 mode,
114 buffer: vec![0.0; num_points],
115 write_pos: 0,
116 count: 0,
117 }
118 }
119
120 pub fn add_value(&mut self, value: f64) {
122 match self.mode {
123 TimeSeriesMode::OneShot => {
124 if self.write_pos < self.num_points {
125 self.buffer[self.write_pos] = value;
126 self.write_pos += 1;
127 self.count = self.write_pos;
128 }
129 }
130 TimeSeriesMode::RingBuffer => {
131 self.buffer[self.write_pos % self.num_points] = value;
132 self.write_pos += 1;
133 self.count = self.count.max(self.write_pos.min(self.num_points));
134 }
135 }
136 }
137
138 pub fn values(&self) -> Vec<f64> {
140 match self.mode {
141 TimeSeriesMode::OneShot => self.buffer[..self.count].to_vec(),
142 TimeSeriesMode::RingBuffer => {
143 if self.write_pos <= self.num_points {
144 self.buffer[..self.count].to_vec()
145 } else {
146 let start = self.write_pos % self.num_points;
147 let mut result = Vec::with_capacity(self.num_points);
148 result.extend_from_slice(&self.buffer[start..]);
149 result.extend_from_slice(&self.buffer[..start]);
150 result
151 }
152 }
153 }
154 }
155
156 pub fn count(&self) -> usize {
157 self.count
158 }
159
160 pub fn reset(&mut self) {
161 self.buffer.fill(0.0);
162 self.write_pos = 0;
163 self.count = 0;
164 }
165
166 pub fn resize(&mut self, num_points: usize) {
168 self.num_points = num_points;
169 self.buffer = vec![0.0; num_points];
170 self.write_pos = 0;
171 self.count = 0;
172 }
173
174 pub fn set_mode(&mut self, mode: TimeSeriesMode) {
176 self.mode = mode;
177 self.reset();
178 }
179}
180
181pub struct TSParams {
185 pub ts_acquire: usize,
186 pub ts_read: usize,
187 pub ts_num_points: usize,
188 pub ts_current_point: usize,
189 pub ts_time_per_point: usize,
190 pub ts_averaging_time: usize,
191 pub ts_num_average: usize,
192 pub ts_elapsed_time: usize,
193 pub ts_acquire_mode: usize,
194 pub ts_time_axis: usize,
195 pub ts_channels: Vec<usize>,
197 pub channel_names: Vec<String>,
199 pub ts_time_series: usize,
201 pub ts_timestamp: usize,
203}
204
205pub struct SharedTsState {
207 pub buffers: Vec<TimeSeries>,
208 pub acquiring: bool,
209 pub start_time: Option<Instant>,
210 pub num_points: usize,
211 pub mode: TimeSeriesMode,
212}
213
214impl SharedTsState {
215 fn new(num_channels: usize, num_points: usize) -> Self {
216 let buffers = (0..num_channels)
217 .map(|_| TimeSeries::new(num_points, TimeSeriesMode::OneShot))
218 .collect();
219 Self {
220 buffers,
221 acquiring: false,
222 start_time: None,
223 num_points,
224 mode: TimeSeriesMode::OneShot,
225 }
226 }
227}
228
229pub struct TimeSeriesPortDriver {
234 base: PortDriverBase,
235 params: TSParams,
236 shared: Arc<Mutex<SharedTsState>>,
237 num_channels: usize,
238 time_per_point: f64,
239}
240
241impl TimeSeriesPortDriver {
242 fn new(
243 port_name: &str,
244 channel_names: &[&str],
245 num_points: usize,
246 shared: Arc<Mutex<SharedTsState>>,
247 ) -> Self {
248 let num_channels = channel_names.len();
249 let mut base = PortDriverBase::new(
250 port_name,
251 1,
252 PortFlags {
253 multi_device: false,
254 can_block: false,
255 destructible: true,
256 },
257 );
258
259 let _ = ad_core_rs::params::ndarray_driver::NDArrayDriverParams::create(&mut base);
261 let _ = ad_core_rs::plugin::params::PluginBaseParams::create(&mut base);
262
263 let ts_acquire = base.create_param("TS_ACQUIRE", ParamType::Int32).unwrap();
265 let _ = base.set_int32_param(ts_acquire, 0, 0);
266 let ts_read = base.create_param("TS_READ", ParamType::Int32).unwrap();
267 let ts_num_points = base
268 .create_param("TS_NUM_POINTS", ParamType::Int32)
269 .unwrap();
270 let _ = base.set_int32_param(ts_num_points, 0, num_points as i32);
271 let ts_current_point = base
272 .create_param("TS_CURRENT_POINT", ParamType::Int32)
273 .unwrap();
274 let _ = base.set_int32_param(ts_current_point, 0, 0);
275 let ts_time_per_point = base
276 .create_param("TS_TIME_PER_POINT", ParamType::Float64)
277 .unwrap();
278 let ts_averaging_time = base
279 .create_param("TS_AVERAGING_TIME", ParamType::Float64)
280 .unwrap();
281 let ts_num_average = base
282 .create_param("TS_NUM_AVERAGE", ParamType::Int32)
283 .unwrap();
284 let _ = base.set_int32_param(ts_num_average, 0, 1);
285 let ts_elapsed_time = base
286 .create_param("TS_ELAPSED_TIME", ParamType::Float64)
287 .unwrap();
288 let ts_acquire_mode = base
289 .create_param("TS_ACQUIRE_MODE", ParamType::Int32)
290 .unwrap();
291 let _ = base.set_int32_param(ts_acquire_mode, 0, 0);
292 let ts_time_axis = base
293 .create_param("TS_TIME_AXIS", ParamType::Float64Array)
294 .unwrap();
295
296 let time_per_point = 1.0;
298 let time_axis: Vec<f64> = (0..num_points).map(|i| i as f64 * time_per_point).collect();
299 let _ = base.params.set_float64_array(ts_time_axis, 0, time_axis);
300
301 let mut ts_channels = Vec::with_capacity(num_channels);
303 for name in channel_names {
304 let param_name = format!("TS_CHAN_{name}");
305 let idx = base
306 .create_param(¶m_name, ParamType::Float64Array)
307 .unwrap();
308 let _ = base.params.set_float64_array(idx, 0, vec![0.0; num_points]);
309 ts_channels.push(idx);
310 }
311
312 let ts_time_series = base
314 .create_param("TS_TIME_SERIES", ParamType::Float64Array)
315 .unwrap();
316 let ts_timestamp = base
317 .create_param("TS_TIMESTAMP", ParamType::Float64Array)
318 .unwrap();
319
320 let params = TSParams {
321 ts_acquire,
322 ts_read,
323 ts_num_points,
324 ts_current_point,
325 ts_time_per_point,
326 ts_averaging_time,
327 ts_num_average,
328 ts_elapsed_time,
329 ts_acquire_mode,
330 ts_time_axis,
331 ts_channels,
332 channel_names: channel_names.iter().map(|s| s.to_string()).collect(),
333 ts_time_series,
334 ts_timestamp,
335 };
336
337 Self {
338 base,
339 params,
340 shared,
341 num_channels,
342 time_per_point,
343 }
344 }
345
346 fn update_waveform_params(&mut self) {
348 let state = self.shared.lock();
349 let num_points = state.num_points;
350
351 for (i, buf) in state.buffers.iter().enumerate() {
353 let mut values = buf.values();
354 values.resize(num_points, 0.0);
355 let _ = self
356 .base
357 .params
358 .set_float64_array(self.params.ts_channels[i], 0, values);
359 }
360
361 let current_point = state.buffers[0].count();
363 let _ = self
364 .base
365 .set_int32_param(self.params.ts_current_point, 0, current_point as i32);
366
367 if let Some(start) = state.start_time {
369 let elapsed = start.elapsed().as_secs_f64();
370 let _ = self
371 .base
372 .set_float64_param(self.params.ts_elapsed_time, 0, elapsed);
373 }
374
375 let acquiring = state.acquiring;
377 drop(state);
378
379 let _ = self
380 .base
381 .set_int32_param(self.params.ts_acquire, 0, if acquiring { 1 } else { 0 });
382
383 let _ = self.base.call_param_callbacks(0);
385 }
386}
387
388impl PortDriver for TimeSeriesPortDriver {
389 fn base(&self) -> &PortDriverBase {
390 &self.base
391 }
392
393 fn base_mut(&mut self) -> &mut PortDriverBase {
394 &mut self.base
395 }
396
397 fn write_int32(&mut self, user: &mut AsynUser, value: i32) -> asyn_rs::error::AsynResult<()> {
398 let reason = user.reason;
399
400 if reason == self.params.ts_acquire {
401 let mut state = self.shared.lock();
402 if value != 0 {
403 if !state.acquiring {
405 if state.buffers[0].count() == 0 {
407 for buf in state.buffers.iter_mut() {
408 buf.reset();
409 }
410 }
411 state.acquiring = true;
412 state.start_time = Some(Instant::now());
413 }
414 } else {
415 state.acquiring = false;
417 }
418 drop(state);
419 self.base.set_int32_param(reason, 0, value)?;
420 self.base.call_param_callbacks(0)?;
421 } else if reason == self.params.ts_read {
422 self.update_waveform_params();
424 } else if reason == self.params.ts_num_points {
425 let new_size = value.max(1) as usize;
426 let mut state = self.shared.lock();
427 state.num_points = new_size;
428 for buf in state.buffers.iter_mut() {
429 buf.resize(new_size);
430 }
431 state.acquiring = false;
432 drop(state);
433
434 let time_axis: Vec<f64> = (0..new_size)
436 .map(|i| i as f64 * self.time_per_point)
437 .collect();
438 let _ = self
439 .base
440 .params
441 .set_float64_array(self.params.ts_time_axis, 0, time_axis);
442
443 for i in 0..self.num_channels {
445 let _ = self.base.params.set_float64_array(
446 self.params.ts_channels[i],
447 0,
448 vec![0.0; new_size],
449 );
450 }
451
452 self.base.set_int32_param(reason, 0, value)?;
453 self.base
454 .set_int32_param(self.params.ts_current_point, 0, 0)?;
455 self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
456 self.base.call_param_callbacks(0)?;
457 } else if reason == self.params.ts_acquire_mode {
458 let mode = if value == 0 {
459 TimeSeriesMode::OneShot
460 } else {
461 TimeSeriesMode::RingBuffer
462 };
463 let mut state = self.shared.lock();
464 state.mode = mode;
465 for buf in state.buffers.iter_mut() {
466 buf.set_mode(mode);
467 }
468 state.acquiring = false;
469 drop(state);
470
471 self.base.set_int32_param(reason, 0, value)?;
472 self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
473 self.base.call_param_callbacks(0)?;
474 } else {
475 self.base.set_int32_param(reason, user.addr, value)?;
477 self.base.call_param_callbacks(user.addr)?;
478 }
479
480 Ok(())
481 }
482
483 fn write_float64(&mut self, user: &mut AsynUser, value: f64) -> asyn_rs::error::AsynResult<()> {
484 let reason = user.reason;
485 if reason == self.params.ts_time_per_point {
486 self.time_per_point = value;
487 self.base.set_float64_param(reason, user.addr, value)?;
488 let num_points = self.shared.lock().num_points;
490 let time_axis: Vec<f64> = (0..num_points)
491 .map(|i| i as f64 * self.time_per_point)
492 .collect();
493 let _ = self
494 .base
495 .params
496 .set_float64_array(self.params.ts_time_axis, 0, time_axis);
497 self.base.call_param_callbacks(user.addr)?;
498 } else {
499 self.base.set_float64_param(reason, user.addr, value)?;
500 self.base.call_param_callbacks(user.addr)?;
501 }
502 Ok(())
503 }
504
505 fn read_float64_array(
506 &mut self,
507 user: &AsynUser,
508 buf: &mut [f64],
509 ) -> asyn_rs::error::AsynResult<usize> {
510 let data = self.base.params.get_float64_array(user.reason, user.addr)?;
511 let n = data.len().min(buf.len());
512 buf[..n].copy_from_slice(&data[..n]);
513 Ok(n)
514 }
515}
516
517fn ts_data_thread(shared: Arc<Mutex<SharedTsState>>, mut data_rx: TimeSeriesReceiver) {
519 while let Some(data) = data_rx.blocking_recv() {
520 let mut state = shared.lock();
521 if !state.acquiring {
522 continue;
523 }
524 let n = data.values.len().min(state.buffers.len());
525 for i in 0..n {
526 state.buffers[i].add_value(data.values[i]);
527 }
528 if state.mode == TimeSeriesMode::OneShot && state.buffers[0].count() >= state.num_points {
530 state.acquiring = false;
531 }
532 }
533}
534
535pub fn create_ts_port_runtime(
541 port_name: &str,
542 channel_names: &[&str],
543 num_points: usize,
544 data_rx: TimeSeriesReceiver,
545) -> (
546 PortRuntimeHandle,
547 TSParams,
548 std::thread::JoinHandle<()>,
549 std::thread::JoinHandle<()>,
550) {
551 let num_channels = channel_names.len();
552 let shared = Arc::new(Mutex::new(SharedTsState::new(num_channels, num_points)));
553
554 let driver = TimeSeriesPortDriver::new(port_name, channel_names, num_points, shared.clone());
555
556 let ts_params = TSParams {
558 ts_acquire: driver.params.ts_acquire,
559 ts_read: driver.params.ts_read,
560 ts_num_points: driver.params.ts_num_points,
561 ts_current_point: driver.params.ts_current_point,
562 ts_time_per_point: driver.params.ts_time_per_point,
563 ts_averaging_time: driver.params.ts_averaging_time,
564 ts_num_average: driver.params.ts_num_average,
565 ts_elapsed_time: driver.params.ts_elapsed_time,
566 ts_acquire_mode: driver.params.ts_acquire_mode,
567 ts_time_axis: driver.params.ts_time_axis,
568 ts_channels: driver.params.ts_channels.clone(),
569 channel_names: driver.params.channel_names.clone(),
570 ts_time_series: driver.params.ts_time_series,
571 ts_timestamp: driver.params.ts_timestamp,
572 };
573
574 let (runtime_handle, actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
575
576 let data_jh = std::thread::Builder::new()
578 .name(format!("ts-data-{port_name}"))
579 .spawn(move || {
580 ts_data_thread(shared, data_rx);
581 })
582 .expect("failed to spawn TS data thread");
583
584 (runtime_handle, ts_params, actor_jh, data_jh)
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn test_one_shot() {
593 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
594 for i in 0..5 {
595 ts.add_value(i as f64);
596 }
597 assert_eq!(ts.count(), 5);
598 assert_eq!(ts.values(), vec![0.0, 1.0, 2.0, 3.0, 4.0]);
599
600 ts.add_value(99.0);
602 assert_eq!(ts.count(), 5);
603 }
604
605 #[test]
606 fn test_ring_buffer() {
607 let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
608 for i in 0..6 {
609 ts.add_value(i as f64);
610 }
611 assert_eq!(ts.count(), 4);
612 assert_eq!(ts.values(), vec![2.0, 3.0, 4.0, 5.0]);
614 }
615
616 #[test]
617 fn test_ring_buffer_partial() {
618 let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
619 ts.add_value(10.0);
620 ts.add_value(20.0);
621 assert_eq!(ts.count(), 2);
622 assert_eq!(ts.values(), vec![10.0, 20.0]);
623 }
624
625 #[test]
626 fn test_reset() {
627 let mut ts = TimeSeries::new(3, TimeSeriesMode::OneShot);
628 ts.add_value(1.0);
629 ts.add_value(2.0);
630 ts.reset();
631 assert_eq!(ts.count(), 0);
632 assert!(ts.values().is_empty());
633 }
634
635 #[test]
636 fn test_resize() {
637 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
638 ts.add_value(1.0);
639 ts.add_value(2.0);
640 ts.resize(3);
641 assert_eq!(ts.num_points, 3);
642 assert_eq!(ts.count(), 0);
643 assert!(ts.values().is_empty());
644 }
645
646 #[test]
647 fn test_set_mode() {
648 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
649 ts.add_value(1.0);
650 ts.set_mode(TimeSeriesMode::RingBuffer);
651 assert_eq!(ts.mode, TimeSeriesMode::RingBuffer);
652 assert_eq!(ts.count(), 0);
653 }
654
655 const TEST_CHANNELS: [&str; 3] = ["ChA", "ChB", "ChC"];
658
659 #[test]
660 fn test_shared_ts_state_init() {
661 let state = SharedTsState::new(3, 100);
662 assert_eq!(state.buffers.len(), 3);
663 assert_eq!(state.num_points, 100);
664 assert!(!state.acquiring);
665 assert_eq!(state.mode, TimeSeriesMode::OneShot);
666 }
667
668 #[test]
669 fn test_ts_port_driver_create() {
670 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
671 let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared);
672 assert_eq!(driver.base().port_name, "TEST_TS");
673 assert_eq!(driver.num_channels, 3);
674 assert!(!driver.base().flags.multi_device);
675 }
676
677 #[test]
678 fn test_ts_port_driver_write_acquire() {
679 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
680 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
681
682 let mut user = AsynUser::new(driver.params.ts_acquire);
684 driver.write_int32(&mut user, 1).unwrap();
685 assert!(shared.lock().acquiring);
686
687 driver.write_int32(&mut user, 0).unwrap();
689 assert!(!shared.lock().acquiring);
690 }
691
692 #[test]
693 fn test_ts_port_driver_write_num_points() {
694 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
695 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
696
697 let mut user = AsynUser::new(driver.params.ts_num_points);
698 driver.write_int32(&mut user, 50).unwrap();
699
700 let state = shared.lock();
701 assert_eq!(state.num_points, 50);
702 for buf in &state.buffers {
703 assert_eq!(buf.num_points, 50);
704 }
705 }
706
707 #[test]
708 fn test_ts_port_driver_write_mode() {
709 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
710 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
711
712 let mut user = AsynUser::new(driver.params.ts_acquire_mode);
713 driver.write_int32(&mut user, 1).unwrap();
714
715 let state = shared.lock();
716 assert_eq!(state.mode, TimeSeriesMode::RingBuffer);
717 for buf in &state.buffers {
718 assert_eq!(buf.mode, TimeSeriesMode::RingBuffer);
719 }
720 }
721
722 #[test]
723 fn test_ts_port_driver_update_waveforms() {
724 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
725 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared.clone());
726
727 {
729 let mut state = shared.lock();
730 state.acquiring = true;
731 state.start_time = Some(Instant::now());
732 for buf in state.buffers.iter_mut() {
733 buf.add_value(42.0);
734 buf.add_value(43.0);
735 }
736 }
737
738 driver.update_waveform_params();
740
741 let cp = driver
743 .base
744 .get_int32_param(driver.params.ts_current_point, 0)
745 .unwrap();
746 assert_eq!(cp, 2);
747
748 let data = driver
750 .base
751 .params
752 .get_float64_array(driver.params.ts_channels[0], 0)
753 .unwrap();
754 assert_eq!(data[0], 42.0);
755 assert_eq!(data[1], 43.0);
756 }
757
758 #[test]
759 fn test_ts_port_driver_read_array() {
760 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 5)));
761 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 5, shared);
762
763 let user = AsynUser::new(driver.params.ts_time_axis);
764 let mut buf = vec![0.0; 5];
765 let n = driver.read_float64_array(&user, &mut buf).unwrap();
766 assert_eq!(n, 5);
767 assert_eq!(buf, vec![0.0, 1.0, 2.0, 3.0, 4.0]);
768 }
769
770 #[test]
771 fn test_ts_data_ingestion_oneshot() {
772 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 3)));
773 let (tx, rx) = tokio::sync::mpsc::channel(16);
774
775 shared.lock().acquiring = true;
777
778 let shared_clone = shared.clone();
779 let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
780
781 tx.blocking_send(TimeSeriesData {
783 values: vec![1.0, 10.0, 100.0],
784 })
785 .unwrap();
786 tx.blocking_send(TimeSeriesData {
787 values: vec![2.0, 20.0, 200.0],
788 })
789 .unwrap();
790 tx.blocking_send(TimeSeriesData {
791 values: vec![3.0, 30.0, 300.0],
792 })
793 .unwrap();
794 tx.blocking_send(TimeSeriesData {
795 values: vec![4.0, 40.0, 400.0],
796 })
797 .unwrap(); drop(tx);
801 jh.join().unwrap();
802
803 let state = shared.lock();
804 assert_eq!(state.buffers[0].count(), 3);
805 assert_eq!(state.buffers[0].values(), vec![1.0, 2.0, 3.0]);
806 assert_eq!(state.buffers[1].values(), vec![10.0, 20.0, 30.0]);
807 assert_eq!(state.buffers[2].values(), vec![100.0, 200.0, 300.0]);
808 assert!(!state.acquiring); }
810
811 #[test]
812 fn test_ts_data_ingestion_not_acquiring() {
813 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
814 let (tx, rx) = tokio::sync::mpsc::channel(16);
815
816 let shared_clone = shared.clone();
818 let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
819
820 tx.blocking_send(TimeSeriesData {
821 values: vec![1.0, 2.0, 3.0],
822 })
823 .unwrap();
824
825 drop(tx);
826 jh.join().unwrap();
827
828 let state = shared.lock();
829 assert_eq!(state.buffers[0].count(), 0);
830 }
831
832 #[test]
833 fn test_create_ts_port_runtime() {
834 let (_tx, rx) = tokio::sync::mpsc::channel(16);
835 let (handle, params, _actor_jh, _data_jh) =
836 create_ts_port_runtime("TEST_TS_RT", &TEST_CHANNELS, 100, rx);
837 assert_eq!(handle.port_name(), "TEST_TS_RT");
838 assert_eq!(params.ts_channels.len(), 3);
839 handle.shutdown();
840 }
841}