1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use asyn_rs::param::ParamType;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::user::AsynUser;
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use parking_lot::Mutex;
11
12use ad_core::plugin::registry::{ParamInfo, ParamRegistry};
13
14pub const NUM_STATS_TS_CHANNELS: usize = 23;
18
19pub const STATS_TS_CHANNEL_NAMES: [&str; NUM_STATS_TS_CHANNELS] = [
21 "TSMinValue",
22 "TSMinX",
23 "TSMinY",
24 "TSMaxValue",
25 "TSMaxX",
26 "TSMaxY",
27 "TSMeanValue",
28 "TSSigma",
29 "TSTotal",
30 "TSNet",
31 "TSCentroidTotal",
32 "TSCentroidX",
33 "TSCentroidY",
34 "TSSigmaX",
35 "TSSigmaY",
36 "TSSigmaXY",
37 "TSSkewX",
38 "TSSkewY",
39 "TSKurtosisX",
40 "TSKurtosisY",
41 "TSEccentricity",
42 "TSOrientation",
43 "TSTimestamp",
44];
45
46pub struct TimeSeriesData {
51 pub values: Vec<f64>,
52}
53
54pub type TimeSeriesSender = tokio::sync::mpsc::Sender<TimeSeriesData>;
56pub type TimeSeriesReceiver = tokio::sync::mpsc::Receiver<TimeSeriesData>;
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum TimeSeriesMode {
62 OneShot,
63 RingBuffer,
64}
65
66pub struct TimeSeries {
68 pub num_points: usize,
69 pub mode: TimeSeriesMode,
70 buffer: Vec<f64>,
71 write_pos: usize,
72 count: usize,
73}
74
75impl TimeSeries {
76 pub fn new(num_points: usize, mode: TimeSeriesMode) -> Self {
77 Self {
78 num_points,
79 mode,
80 buffer: vec![0.0; num_points],
81 write_pos: 0,
82 count: 0,
83 }
84 }
85
86 pub fn add_value(&mut self, value: f64) {
88 match self.mode {
89 TimeSeriesMode::OneShot => {
90 if self.write_pos < self.num_points {
91 self.buffer[self.write_pos] = value;
92 self.write_pos += 1;
93 self.count = self.write_pos;
94 }
95 }
96 TimeSeriesMode::RingBuffer => {
97 self.buffer[self.write_pos % self.num_points] = value;
98 self.write_pos += 1;
99 self.count = self.count.max(self.write_pos.min(self.num_points));
100 }
101 }
102 }
103
104 pub fn values(&self) -> Vec<f64> {
106 match self.mode {
107 TimeSeriesMode::OneShot => self.buffer[..self.count].to_vec(),
108 TimeSeriesMode::RingBuffer => {
109 if self.write_pos <= self.num_points {
110 self.buffer[..self.count].to_vec()
111 } else {
112 let start = self.write_pos % self.num_points;
113 let mut result = Vec::with_capacity(self.num_points);
114 result.extend_from_slice(&self.buffer[start..]);
115 result.extend_from_slice(&self.buffer[..start]);
116 result
117 }
118 }
119 }
120 }
121
122 pub fn count(&self) -> usize {
123 self.count
124 }
125
126 pub fn reset(&mut self) {
127 self.buffer.fill(0.0);
128 self.write_pos = 0;
129 self.count = 0;
130 }
131
132 pub fn resize(&mut self, num_points: usize) {
134 self.num_points = num_points;
135 self.buffer = vec![0.0; num_points];
136 self.write_pos = 0;
137 self.count = 0;
138 }
139
140 pub fn set_mode(&mut self, mode: TimeSeriesMode) {
142 self.mode = mode;
143 self.reset();
144 }
145}
146
147pub struct TSParams {
151 pub ts_acquire: usize,
152 pub ts_read: usize,
153 pub ts_num_points: usize,
154 pub ts_current_point: usize,
155 pub ts_time_per_point: usize,
156 pub ts_averaging_time: usize,
157 pub ts_num_average: usize,
158 pub ts_elapsed_time: usize,
159 pub ts_acquire_mode: usize,
160 pub ts_time_axis: usize,
161 pub ts_channels: Vec<usize>,
163 pub channel_names: Vec<String>,
165}
166
167pub struct SharedTsState {
169 pub buffers: Vec<TimeSeries>,
170 pub acquiring: bool,
171 pub start_time: Option<Instant>,
172 pub num_points: usize,
173 pub mode: TimeSeriesMode,
174}
175
176impl SharedTsState {
177 fn new(num_channels: usize, num_points: usize) -> Self {
178 let buffers = (0..num_channels)
179 .map(|_| TimeSeries::new(num_points, TimeSeriesMode::OneShot))
180 .collect();
181 Self {
182 buffers,
183 acquiring: false,
184 start_time: None,
185 num_points,
186 mode: TimeSeriesMode::OneShot,
187 }
188 }
189}
190
191pub struct TimeSeriesPortDriver {
196 base: PortDriverBase,
197 params: TSParams,
198 shared: Arc<Mutex<SharedTsState>>,
199 num_channels: usize,
200}
201
202impl TimeSeriesPortDriver {
203 fn new(
204 port_name: &str,
205 channel_names: &[&str],
206 num_points: usize,
207 shared: Arc<Mutex<SharedTsState>>,
208 ) -> Self {
209 let num_channels = channel_names.len();
210 let mut base = PortDriverBase::new(port_name, 1, PortFlags {
211 multi_device: false,
212 can_block: false,
213 destructible: true,
214 });
215
216 let ts_acquire = base.create_param("TS_ACQUIRE", ParamType::Int32).unwrap();
218 let _ = base.set_int32_param(ts_acquire, 0, 0);
219 let ts_read = base.create_param("TS_READ", ParamType::Int32).unwrap();
220 let ts_num_points = base.create_param("TS_NUM_POINTS", ParamType::Int32).unwrap();
221 let _ = base.set_int32_param(ts_num_points, 0, num_points as i32);
222 let ts_current_point = base.create_param("TS_CURRENT_POINT", ParamType::Int32).unwrap();
223 let _ = base.set_int32_param(ts_current_point, 0, 0);
224 let ts_time_per_point = base.create_param("TS_TIME_PER_POINT", ParamType::Float64).unwrap();
225 let ts_averaging_time = base.create_param("TS_AVERAGING_TIME", ParamType::Float64).unwrap();
226 let ts_num_average = base.create_param("TS_NUM_AVERAGE", ParamType::Int32).unwrap();
227 let _ = base.set_int32_param(ts_num_average, 0, 1);
228 let ts_elapsed_time = base.create_param("TS_ELAPSED_TIME", ParamType::Float64).unwrap();
229 let ts_acquire_mode = base.create_param("TS_ACQUIRE_MODE", ParamType::Int32).unwrap();
230 let _ = base.set_int32_param(ts_acquire_mode, 0, 0);
231 let ts_time_axis = base.create_param("TS_TIME_AXIS", ParamType::Float64Array).unwrap();
232
233 let time_axis: Vec<f64> = (0..num_points).map(|i| i as f64).collect();
235 let _ = base.params.set_float64_array(ts_time_axis, 0, time_axis);
236
237 let mut ts_channels = Vec::with_capacity(num_channels);
239 for name in channel_names {
240 let param_name = format!("TS_CHAN_{name}");
241 let idx = base.create_param(¶m_name, ParamType::Float64Array).unwrap();
242 let _ = base.params.set_float64_array(idx, 0, vec![0.0; num_points]);
243 ts_channels.push(idx);
244 }
245
246 let params = TSParams {
247 ts_acquire,
248 ts_read,
249 ts_num_points,
250 ts_current_point,
251 ts_time_per_point,
252 ts_averaging_time,
253 ts_num_average,
254 ts_elapsed_time,
255 ts_acquire_mode,
256 ts_time_axis,
257 ts_channels,
258 channel_names: channel_names.iter().map(|s| s.to_string()).collect(),
259 };
260
261 Self { base, params, shared, num_channels }
262 }
263
264 fn update_waveform_params(&mut self) {
266 let state = self.shared.lock();
267 let num_points = state.num_points;
268
269 for (i, buf) in state.buffers.iter().enumerate() {
271 let mut values = buf.values();
272 values.resize(num_points, 0.0);
273 let _ = self.base.params.set_float64_array(
274 self.params.ts_channels[i], 0, values,
275 );
276 }
277
278 let current_point = state.buffers[0].count();
280 let _ = self.base.set_int32_param(self.params.ts_current_point, 0, current_point as i32);
281
282 if let Some(start) = state.start_time {
284 let elapsed = start.elapsed().as_secs_f64();
285 let _ = self.base.set_float64_param(self.params.ts_elapsed_time, 0, elapsed);
286 }
287
288 let acquiring = state.acquiring;
290 drop(state);
291
292 let _ = self.base.set_int32_param(self.params.ts_acquire, 0, if acquiring { 1 } else { 0 });
293
294 let _ = self.base.call_param_callbacks(0);
296 }
297}
298
299impl PortDriver for TimeSeriesPortDriver {
300 fn base(&self) -> &PortDriverBase {
301 &self.base
302 }
303
304 fn base_mut(&mut self) -> &mut PortDriverBase {
305 &mut self.base
306 }
307
308 fn write_int32(&mut self, user: &mut AsynUser, value: i32) -> asyn_rs::error::AsynResult<()> {
309 let reason = user.reason;
310
311 if reason == self.params.ts_acquire {
312 let mut state = self.shared.lock();
313 if value != 0 {
314 if !state.acquiring {
316 if state.buffers[0].count() == 0 {
318 for buf in state.buffers.iter_mut() {
319 buf.reset();
320 }
321 }
322 state.acquiring = true;
323 state.start_time = Some(Instant::now());
324 }
325 } else {
326 state.acquiring = false;
328 }
329 drop(state);
330 self.base.set_int32_param(reason, 0, value)?;
331 self.base.call_param_callbacks(0)?;
332 } else if reason == self.params.ts_read {
333 self.update_waveform_params();
335 } else if reason == self.params.ts_num_points {
336 let new_size = value.max(1) as usize;
337 let mut state = self.shared.lock();
338 state.num_points = new_size;
339 for buf in state.buffers.iter_mut() {
340 buf.resize(new_size);
341 }
342 state.acquiring = false;
343 drop(state);
344
345 let time_axis: Vec<f64> = (0..new_size).map(|i| i as f64).collect();
347 let _ = self.base.params.set_float64_array(self.params.ts_time_axis, 0, time_axis);
348
349 for i in 0..self.num_channels {
351 let _ = self.base.params.set_float64_array(
352 self.params.ts_channels[i], 0, vec![0.0; new_size],
353 );
354 }
355
356 self.base.set_int32_param(reason, 0, value)?;
357 self.base.set_int32_param(self.params.ts_current_point, 0, 0)?;
358 self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
359 self.base.call_param_callbacks(0)?;
360 } else if reason == self.params.ts_acquire_mode {
361 let mode = if value == 0 {
362 TimeSeriesMode::OneShot
363 } else {
364 TimeSeriesMode::RingBuffer
365 };
366 let mut state = self.shared.lock();
367 state.mode = mode;
368 for buf in state.buffers.iter_mut() {
369 buf.set_mode(mode);
370 }
371 state.acquiring = false;
372 drop(state);
373
374 self.base.set_int32_param(reason, 0, value)?;
375 self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
376 self.base.call_param_callbacks(0)?;
377 } else {
378 self.base.set_int32_param(reason, user.addr, value)?;
380 self.base.call_param_callbacks(user.addr)?;
381 }
382
383 Ok(())
384 }
385
386 fn read_float64_array(
387 &mut self,
388 user: &AsynUser,
389 buf: &mut [f64],
390 ) -> asyn_rs::error::AsynResult<usize> {
391 let data = self.base.params.get_float64_array(user.reason, user.addr)?;
392 let n = data.len().min(buf.len());
393 buf[..n].copy_from_slice(&data[..n]);
394 Ok(n)
395 }
396}
397
398fn ts_data_thread(
400 shared: Arc<Mutex<SharedTsState>>,
401 mut data_rx: TimeSeriesReceiver,
402) {
403 while let Some(data) = data_rx.blocking_recv() {
404 let mut state = shared.lock();
405 if !state.acquiring {
406 continue;
407 }
408 let n = data.values.len().min(state.buffers.len());
409 for i in 0..n {
410 state.buffers[i].add_value(data.values[i]);
411 }
412 if state.mode == TimeSeriesMode::OneShot
414 && state.buffers[0].count() >= state.num_points
415 {
416 state.acquiring = false;
417 }
418 }
419}
420
421pub fn create_ts_port_runtime(
427 port_name: &str,
428 channel_names: &[&str],
429 num_points: usize,
430 data_rx: TimeSeriesReceiver,
431) -> (PortRuntimeHandle, TSParams, std::thread::JoinHandle<()>, std::thread::JoinHandle<()>) {
432 let num_channels = channel_names.len();
433 let shared = Arc::new(Mutex::new(SharedTsState::new(num_channels, num_points)));
434
435 let driver = TimeSeriesPortDriver::new(port_name, channel_names, num_points, shared.clone());
436
437 let ts_params = TSParams {
439 ts_acquire: driver.params.ts_acquire,
440 ts_read: driver.params.ts_read,
441 ts_num_points: driver.params.ts_num_points,
442 ts_current_point: driver.params.ts_current_point,
443 ts_time_per_point: driver.params.ts_time_per_point,
444 ts_averaging_time: driver.params.ts_averaging_time,
445 ts_num_average: driver.params.ts_num_average,
446 ts_elapsed_time: driver.params.ts_elapsed_time,
447 ts_acquire_mode: driver.params.ts_acquire_mode,
448 ts_time_axis: driver.params.ts_time_axis,
449 ts_channels: driver.params.ts_channels.clone(),
450 channel_names: driver.params.channel_names.clone(),
451 };
452
453 let (runtime_handle, actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
454
455 let data_jh = std::thread::Builder::new()
457 .name(format!("ts-data-{port_name}"))
458 .spawn(move || {
459 ts_data_thread(shared, data_rx);
460 })
461 .expect("failed to spawn TS data thread");
462
463 (runtime_handle, ts_params, actor_jh, data_jh)
464}
465
466pub fn build_ts_registry(tp: &TSParams) -> ParamRegistry {
470 let mut map: ParamRegistry = HashMap::new();
471
472 map.insert("TSAcquire".into(), ParamInfo::int32(tp.ts_acquire, "TS_ACQUIRE"));
474 map.insert("TSAcquiring".into(), ParamInfo::int32(tp.ts_acquire, "TS_ACQUIRE"));
475 map.insert("TSRead".into(), ParamInfo::int32(tp.ts_read, "TS_READ"));
476 map.insert("TSNumPoints".into(), ParamInfo::int32(tp.ts_num_points, "TS_NUM_POINTS"));
477 map.insert("TSNumPoints_RBV".into(), ParamInfo::int32(tp.ts_num_points, "TS_NUM_POINTS"));
478 map.insert("TSCurrentPoint".into(), ParamInfo::int32(tp.ts_current_point, "TS_CURRENT_POINT"));
479 map.insert("TSTimePerPoint".into(), ParamInfo::float64(tp.ts_time_per_point, "TS_TIME_PER_POINT"));
480 map.insert("TSTimePerPoint_RBV".into(), ParamInfo::float64(tp.ts_time_per_point, "TS_TIME_PER_POINT"));
481 map.insert("TSAveragingTime".into(), ParamInfo::float64(tp.ts_averaging_time, "TS_AVERAGING_TIME"));
482 map.insert("TSAveragingTime_RBV".into(), ParamInfo::float64(tp.ts_averaging_time, "TS_AVERAGING_TIME"));
483 map.insert("TSNumAverage".into(), ParamInfo::int32(tp.ts_num_average, "TS_NUM_AVERAGE"));
484 map.insert("TSElapsedTime".into(), ParamInfo::float64(tp.ts_elapsed_time, "TS_ELAPSED_TIME"));
485 map.insert("TSAcquireMode".into(), ParamInfo::int32(tp.ts_acquire_mode, "TS_ACQUIRE_MODE"));
486 map.insert("TSAcquireMode_RBV".into(), ParamInfo::int32(tp.ts_acquire_mode, "TS_ACQUIRE_MODE"));
487
488 map.insert("TSTimeAxis".into(), ParamInfo::float64_array(tp.ts_time_axis, "TS_TIME_AXIS"));
490
491 for (i, name) in tp.channel_names.iter().enumerate() {
493 let drv_info = format!("TS_CHAN_{name}");
494 map.insert(name.clone(), ParamInfo::float64_array(tp.ts_channels[i], &drv_info));
495 }
496
497 map
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn test_one_shot() {
506 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
507 for i in 0..5 {
508 ts.add_value(i as f64);
509 }
510 assert_eq!(ts.count(), 5);
511 assert_eq!(ts.values(), vec![0.0, 1.0, 2.0, 3.0, 4.0]);
512
513 ts.add_value(99.0);
515 assert_eq!(ts.count(), 5);
516 }
517
518 #[test]
519 fn test_ring_buffer() {
520 let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
521 for i in 0..6 {
522 ts.add_value(i as f64);
523 }
524 assert_eq!(ts.count(), 4);
525 assert_eq!(ts.values(), vec![2.0, 3.0, 4.0, 5.0]);
527 }
528
529 #[test]
530 fn test_ring_buffer_partial() {
531 let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
532 ts.add_value(10.0);
533 ts.add_value(20.0);
534 assert_eq!(ts.count(), 2);
535 assert_eq!(ts.values(), vec![10.0, 20.0]);
536 }
537
538 #[test]
539 fn test_reset() {
540 let mut ts = TimeSeries::new(3, TimeSeriesMode::OneShot);
541 ts.add_value(1.0);
542 ts.add_value(2.0);
543 ts.reset();
544 assert_eq!(ts.count(), 0);
545 assert!(ts.values().is_empty());
546 }
547
548 #[test]
549 fn test_resize() {
550 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
551 ts.add_value(1.0);
552 ts.add_value(2.0);
553 ts.resize(3);
554 assert_eq!(ts.num_points, 3);
555 assert_eq!(ts.count(), 0);
556 assert!(ts.values().is_empty());
557 }
558
559 #[test]
560 fn test_set_mode() {
561 let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
562 ts.add_value(1.0);
563 ts.set_mode(TimeSeriesMode::RingBuffer);
564 assert_eq!(ts.mode, TimeSeriesMode::RingBuffer);
565 assert_eq!(ts.count(), 0);
566 }
567
568 const TEST_CHANNELS: [&str; 3] = ["ChA", "ChB", "ChC"];
571
572 #[test]
573 fn test_shared_ts_state_init() {
574 let state = SharedTsState::new(3, 100);
575 assert_eq!(state.buffers.len(), 3);
576 assert_eq!(state.num_points, 100);
577 assert!(!state.acquiring);
578 assert_eq!(state.mode, TimeSeriesMode::OneShot);
579 }
580
581 #[test]
582 fn test_ts_port_driver_create() {
583 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
584 let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared);
585 assert_eq!(driver.base().port_name, "TEST_TS");
586 assert_eq!(driver.num_channels, 3);
587 assert!(!driver.base().flags.multi_device);
588 }
589
590 #[test]
591 fn test_ts_port_driver_write_acquire() {
592 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
593 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
594
595 let mut user = AsynUser::new(driver.params.ts_acquire);
597 driver.write_int32(&mut user, 1).unwrap();
598 assert!(shared.lock().acquiring);
599
600 driver.write_int32(&mut user, 0).unwrap();
602 assert!(!shared.lock().acquiring);
603 }
604
605 #[test]
606 fn test_ts_port_driver_write_num_points() {
607 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
608 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
609
610 let mut user = AsynUser::new(driver.params.ts_num_points);
611 driver.write_int32(&mut user, 50).unwrap();
612
613 let state = shared.lock();
614 assert_eq!(state.num_points, 50);
615 for buf in &state.buffers {
616 assert_eq!(buf.num_points, 50);
617 }
618 }
619
620 #[test]
621 fn test_ts_port_driver_write_mode() {
622 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
623 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
624
625 let mut user = AsynUser::new(driver.params.ts_acquire_mode);
626 driver.write_int32(&mut user, 1).unwrap();
627
628 let state = shared.lock();
629 assert_eq!(state.mode, TimeSeriesMode::RingBuffer);
630 for buf in &state.buffers {
631 assert_eq!(buf.mode, TimeSeriesMode::RingBuffer);
632 }
633 }
634
635 #[test]
636 fn test_ts_port_driver_update_waveforms() {
637 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
638 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared.clone());
639
640 {
642 let mut state = shared.lock();
643 state.acquiring = true;
644 state.start_time = Some(Instant::now());
645 for buf in state.buffers.iter_mut() {
646 buf.add_value(42.0);
647 buf.add_value(43.0);
648 }
649 }
650
651 driver.update_waveform_params();
653
654 let cp = driver.base.get_int32_param(driver.params.ts_current_point, 0).unwrap();
656 assert_eq!(cp, 2);
657
658 let data = driver.base.params.get_float64_array(driver.params.ts_channels[0], 0).unwrap();
660 assert_eq!(data[0], 42.0);
661 assert_eq!(data[1], 43.0);
662 }
663
664 #[test]
665 fn test_ts_port_driver_read_array() {
666 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 5)));
667 let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 5, shared);
668
669 let user = AsynUser::new(driver.params.ts_time_axis);
670 let mut buf = vec![0.0; 5];
671 let n = driver.read_float64_array(&user, &mut buf).unwrap();
672 assert_eq!(n, 5);
673 assert_eq!(buf, vec![0.0, 1.0, 2.0, 3.0, 4.0]);
674 }
675
676 #[test]
677 fn test_ts_data_ingestion_oneshot() {
678 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 3)));
679 let (tx, rx) = tokio::sync::mpsc::channel(16);
680
681 shared.lock().acquiring = true;
683
684 let shared_clone = shared.clone();
685 let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
686
687 tx.blocking_send(TimeSeriesData { values: vec![1.0, 10.0, 100.0] }).unwrap();
689 tx.blocking_send(TimeSeriesData { values: vec![2.0, 20.0, 200.0] }).unwrap();
690 tx.blocking_send(TimeSeriesData { values: vec![3.0, 30.0, 300.0] }).unwrap();
691 tx.blocking_send(TimeSeriesData { values: vec![4.0, 40.0, 400.0] }).unwrap(); drop(tx);
695 jh.join().unwrap();
696
697 let state = shared.lock();
698 assert_eq!(state.buffers[0].count(), 3);
699 assert_eq!(state.buffers[0].values(), vec![1.0, 2.0, 3.0]);
700 assert_eq!(state.buffers[1].values(), vec![10.0, 20.0, 30.0]);
701 assert_eq!(state.buffers[2].values(), vec![100.0, 200.0, 300.0]);
702 assert!(!state.acquiring); }
704
705 #[test]
706 fn test_ts_data_ingestion_not_acquiring() {
707 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
708 let (tx, rx) = tokio::sync::mpsc::channel(16);
709
710 let shared_clone = shared.clone();
712 let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
713
714 tx.blocking_send(TimeSeriesData { values: vec![1.0, 2.0, 3.0] }).unwrap();
715
716 drop(tx);
717 jh.join().unwrap();
718
719 let state = shared.lock();
720 assert_eq!(state.buffers[0].count(), 0);
721 }
722
723 #[test]
724 fn test_build_ts_registry() {
725 let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
726 let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared);
727 let registry = build_ts_registry(&driver.params);
728
729 assert!(registry.contains_key("TSAcquire"));
731 assert!(registry.contains_key("TSAcquiring"));
732 assert!(registry.contains_key("TSRead"));
733 assert!(registry.contains_key("TSNumPoints"));
734 assert!(registry.contains_key("TSCurrentPoint"));
735 assert!(registry.contains_key("TSAcquireMode"));
736
737 assert!(registry.contains_key("ChA"));
739 assert!(registry.contains_key("ChB"));
740 assert!(registry.contains_key("ChC"));
741 assert!(registry.contains_key("TSTimeAxis"));
742
743 assert_eq!(registry.len(), 18);
745 }
746
747 #[test]
748 fn test_build_ts_registry_stats_channels() {
749 let shared = Arc::new(Mutex::new(SharedTsState::new(NUM_STATS_TS_CHANNELS, 10)));
750 let driver = TimeSeriesPortDriver::new("STATS_TS", &STATS_TS_CHANNEL_NAMES, 10, shared);
751 let registry = build_ts_registry(&driver.params);
752
753 assert_eq!(registry.len(), 38);
755 assert!(registry.contains_key("TSMinValue"));
756 assert!(registry.contains_key("TSTimestamp"));
757 }
758
759 #[test]
760 fn test_create_ts_port_runtime() {
761 let (_tx, rx) = tokio::sync::mpsc::channel(16);
762 let (handle, params, _actor_jh, _data_jh) =
763 create_ts_port_runtime("TEST_TS_RT", &TEST_CHANNELS, 100, rx);
764 assert_eq!(handle.port_name(), "TEST_TS_RT");
765 assert_eq!(params.ts_channels.len(), 3);
766 handle.shutdown();
767 }
768}