Skip to main content

ad_plugins_rs/
time_series.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use asyn_rs::param::ParamType;
5use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
6use asyn_rs::runtime::config::RuntimeConfig;
7use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
8use asyn_rs::user::AsynUser;
9use parking_lot::Mutex;
10
11// ===== Stats-specific channel definitions =====
12
13/// Number of stats channels in the time series.
14pub const NUM_STATS_TS_CHANNELS: usize = 23;
15
16/// Channel names for the 23 NDStats time series channels.
17pub const STATS_TS_CHANNEL_NAMES: [&str; NUM_STATS_TS_CHANNELS] = [
18    "TSMinValue",
19    "TSMinX",
20    "TSMinY",
21    "TSMaxValue",
22    "TSMaxX",
23    "TSMaxY",
24    "TSMeanValue",
25    "TSSigma",
26    "TSTotal",
27    "TSNet",
28    "TSCentroidTotal",
29    "TSCentroidX",
30    "TSCentroidY",
31    "TSSigmaX",
32    "TSSigmaY",
33    "TSSigmaXY",
34    "TSSkewX",
35    "TSSkewY",
36    "TSKurtosisX",
37    "TSKurtosisY",
38    "TSEccentricity",
39    "TSOrientation",
40    "TSTimestamp",
41];
42
43// ===== Generic time series data =====
44
45/// Shared data pushed from a plugin processor to a TS port driver.
46/// `values` length must match the channel count configured on the driver.
47pub struct TimeSeriesData {
48    pub values: Vec<f64>,
49}
50
51/// Sender from plugin -> TS port.
52pub type TimeSeriesSender = tokio::sync::mpsc::Sender<TimeSeriesData>;
53/// Receiver in TS port background thread.
54pub type TimeSeriesReceiver = tokio::sync::mpsc::Receiver<TimeSeriesData>;
55
56/// Registry for pending TS receivers, keyed by upstream plugin port name.
57/// NDStatsConfigure etc. store receivers here; NDTimeSeriesConfigure picks them up.
58pub struct TsReceiverRegistry {
59    inner: std::sync::Mutex<std::collections::HashMap<String, (TimeSeriesReceiver, Vec<String>)>>,
60}
61
62impl TsReceiverRegistry {
63    pub fn new() -> Self {
64        Self {
65            inner: std::sync::Mutex::new(std::collections::HashMap::new()),
66        }
67    }
68
69    /// Store a receiver and its channel names for a given upstream port.
70    pub fn store(
71        &self,
72        upstream_port: &str,
73        receiver: TimeSeriesReceiver,
74        channel_names: Vec<String>,
75    ) {
76        let mut map = self.inner.lock().unwrap();
77        map.insert(upstream_port.to_string(), (receiver, channel_names));
78    }
79
80    /// Take a receiver for the given upstream port (returns None if not found or already taken).
81    pub fn take(&self, upstream_port: &str) -> Option<(TimeSeriesReceiver, Vec<String>)> {
82        let mut map = self.inner.lock().unwrap();
83        map.remove(upstream_port)
84    }
85}
86
87impl Default for TsReceiverRegistry {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93/// Accumulation mode for time series.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum TimeSeriesMode {
96    OneShot,
97    RingBuffer,
98}
99
100/// Time-series accumulator: stores scalar/1D values from successive arrays.
101pub struct TimeSeries {
102    pub num_points: usize,
103    pub mode: TimeSeriesMode,
104    buffer: Vec<f64>,
105    write_pos: usize,
106    count: usize,
107}
108
109impl TimeSeries {
110    pub fn new(num_points: usize, mode: TimeSeriesMode) -> Self {
111        Self {
112            num_points,
113            mode,
114            buffer: vec![0.0; num_points],
115            write_pos: 0,
116            count: 0,
117        }
118    }
119
120    /// Add a value (e.g., mean of an array) to the time series.
121    pub fn add_value(&mut self, value: f64) {
122        match self.mode {
123            TimeSeriesMode::OneShot => {
124                if self.write_pos < self.num_points {
125                    self.buffer[self.write_pos] = value;
126                    self.write_pos += 1;
127                    self.count = self.write_pos;
128                }
129            }
130            TimeSeriesMode::RingBuffer => {
131                self.buffer[self.write_pos % self.num_points] = value;
132                self.write_pos += 1;
133                self.count = self.count.max(self.write_pos.min(self.num_points));
134            }
135        }
136    }
137
138    /// Get the accumulated values in order.
139    pub fn values(&self) -> Vec<f64> {
140        match self.mode {
141            TimeSeriesMode::OneShot => self.buffer[..self.count].to_vec(),
142            TimeSeriesMode::RingBuffer => {
143                if self.write_pos <= self.num_points {
144                    self.buffer[..self.count].to_vec()
145                } else {
146                    let start = self.write_pos % self.num_points;
147                    let mut result = Vec::with_capacity(self.num_points);
148                    result.extend_from_slice(&self.buffer[start..]);
149                    result.extend_from_slice(&self.buffer[..start]);
150                    result
151                }
152            }
153        }
154    }
155
156    pub fn count(&self) -> usize {
157        self.count
158    }
159
160    pub fn reset(&mut self) {
161        self.buffer.fill(0.0);
162        self.write_pos = 0;
163        self.count = 0;
164    }
165
166    /// Resize the buffer. Resets all data.
167    pub fn resize(&mut self, num_points: usize) {
168        self.num_points = num_points;
169        self.buffer = vec![0.0; num_points];
170        self.write_pos = 0;
171        self.count = 0;
172    }
173
174    /// Change the accumulation mode. Resets all data.
175    pub fn set_mode(&mut self, mode: TimeSeriesMode) {
176        self.mode = mode;
177        self.reset();
178    }
179}
180
181// ===== Time Series Port Driver =====
182
183/// Param indices for the TS port.
184pub struct TSParams {
185    pub ts_acquire: usize,
186    pub ts_read: usize,
187    pub ts_num_points: usize,
188    pub ts_current_point: usize,
189    pub ts_time_per_point: usize,
190    pub ts_averaging_time: usize,
191    pub ts_num_average: usize,
192    pub ts_elapsed_time: usize,
193    pub ts_acquire_mode: usize,
194    pub ts_time_axis: usize,
195    /// Per-channel waveform param indices (length = num_channels).
196    pub ts_channels: Vec<usize>,
197    /// Channel names (kept for registry building).
198    pub channel_names: Vec<String>,
199}
200
201/// Shared state between the data ingestion thread and the TS port driver.
202pub struct SharedTsState {
203    pub buffers: Vec<TimeSeries>,
204    pub acquiring: bool,
205    pub start_time: Option<Instant>,
206    pub num_points: usize,
207    pub mode: TimeSeriesMode,
208}
209
210impl SharedTsState {
211    fn new(num_channels: usize, num_points: usize) -> Self {
212        let buffers = (0..num_channels)
213            .map(|_| TimeSeries::new(num_points, TimeSeriesMode::OneShot))
214            .collect();
215        Self {
216            buffers,
217            acquiring: false,
218            start_time: None,
219            num_points,
220            mode: TimeSeriesMode::OneShot,
221        }
222    }
223}
224
225/// TS port driver: standalone asyn PortDriver for time series waveforms.
226///
227/// Generic over the number of channels — Stats uses 23, ROIStat uses
228/// a different set, and NDTimeSeries standalone can use any count.
229pub struct TimeSeriesPortDriver {
230    base: PortDriverBase,
231    params: TSParams,
232    shared: Arc<Mutex<SharedTsState>>,
233    num_channels: usize,
234}
235
236impl TimeSeriesPortDriver {
237    fn new(
238        port_name: &str,
239        channel_names: &[&str],
240        num_points: usize,
241        shared: Arc<Mutex<SharedTsState>>,
242    ) -> Self {
243        let num_channels = channel_names.len();
244        let mut base = PortDriverBase::new(
245            port_name,
246            1,
247            PortFlags {
248                multi_device: false,
249                can_block: false,
250                destructible: true,
251            },
252        );
253
254        // NDPluginBase params (NDTimeSeries.template includes NDPluginBase.template)
255        let _ = ad_core_rs::params::ndarray_driver::NDArrayDriverParams::create(&mut base);
256        let _ = ad_core_rs::plugin::params::PluginBaseParams::create(&mut base);
257
258        // Register control params
259        let ts_acquire = base.create_param("TS_ACQUIRE", ParamType::Int32).unwrap();
260        let _ = base.set_int32_param(ts_acquire, 0, 0);
261        let ts_read = base.create_param("TS_READ", ParamType::Int32).unwrap();
262        let ts_num_points = base
263            .create_param("TS_NUM_POINTS", ParamType::Int32)
264            .unwrap();
265        let _ = base.set_int32_param(ts_num_points, 0, num_points as i32);
266        let ts_current_point = base
267            .create_param("TS_CURRENT_POINT", ParamType::Int32)
268            .unwrap();
269        let _ = base.set_int32_param(ts_current_point, 0, 0);
270        let ts_time_per_point = base
271            .create_param("TS_TIME_PER_POINT", ParamType::Float64)
272            .unwrap();
273        let ts_averaging_time = base
274            .create_param("TS_AVERAGING_TIME", ParamType::Float64)
275            .unwrap();
276        let ts_num_average = base
277            .create_param("TS_NUM_AVERAGE", ParamType::Int32)
278            .unwrap();
279        let _ = base.set_int32_param(ts_num_average, 0, 1);
280        let ts_elapsed_time = base
281            .create_param("TS_ELAPSED_TIME", ParamType::Float64)
282            .unwrap();
283        let ts_acquire_mode = base
284            .create_param("TS_ACQUIRE_MODE", ParamType::Int32)
285            .unwrap();
286        let _ = base.set_int32_param(ts_acquire_mode, 0, 0);
287        let ts_time_axis = base
288            .create_param("TS_TIME_AXIS", ParamType::Float64Array)
289            .unwrap();
290
291        // Initialize time axis
292        let time_axis: Vec<f64> = (0..num_points).map(|i| i as f64).collect();
293        let _ = base.params.set_float64_array(ts_time_axis, 0, time_axis);
294
295        // Channel waveform params — one Float64Array per channel
296        let mut ts_channels = Vec::with_capacity(num_channels);
297        for name in channel_names {
298            let param_name = format!("TS_CHAN_{name}");
299            let idx = base
300                .create_param(&param_name, ParamType::Float64Array)
301                .unwrap();
302            let _ = base.params.set_float64_array(idx, 0, vec![0.0; num_points]);
303            ts_channels.push(idx);
304        }
305
306        let params = TSParams {
307            ts_acquire,
308            ts_read,
309            ts_num_points,
310            ts_current_point,
311            ts_time_per_point,
312            ts_averaging_time,
313            ts_num_average,
314            ts_elapsed_time,
315            ts_acquire_mode,
316            ts_time_axis,
317            ts_channels,
318            channel_names: channel_names.iter().map(|s| s.to_string()).collect(),
319        };
320
321        Self {
322            base,
323            params,
324            shared,
325            num_channels,
326        }
327    }
328
329    /// Copy buffer data to Float64Array params and call callbacks.
330    fn update_waveform_params(&mut self) {
331        let state = self.shared.lock();
332        let num_points = state.num_points;
333
334        // Update per-channel waveform params
335        for (i, buf) in state.buffers.iter().enumerate() {
336            let mut values = buf.values();
337            values.resize(num_points, 0.0);
338            let _ = self
339                .base
340                .params
341                .set_float64_array(self.params.ts_channels[i], 0, values);
342        }
343
344        // Update current point
345        let current_point = state.buffers[0].count();
346        let _ = self
347            .base
348            .set_int32_param(self.params.ts_current_point, 0, current_point as i32);
349
350        // Update elapsed time
351        if let Some(start) = state.start_time {
352            let elapsed = start.elapsed().as_secs_f64();
353            let _ = self
354                .base
355                .set_float64_param(self.params.ts_elapsed_time, 0, elapsed);
356        }
357
358        // Update acquire status (may have auto-stopped)
359        let acquiring = state.acquiring;
360        drop(state);
361
362        let _ = self
363            .base
364            .set_int32_param(self.params.ts_acquire, 0, if acquiring { 1 } else { 0 });
365
366        // Notify listeners
367        let _ = self.base.call_param_callbacks(0);
368    }
369}
370
371impl PortDriver for TimeSeriesPortDriver {
372    fn base(&self) -> &PortDriverBase {
373        &self.base
374    }
375
376    fn base_mut(&mut self) -> &mut PortDriverBase {
377        &mut self.base
378    }
379
380    fn write_int32(&mut self, user: &mut AsynUser, value: i32) -> asyn_rs::error::AsynResult<()> {
381        let reason = user.reason;
382
383        if reason == self.params.ts_acquire {
384            let mut state = self.shared.lock();
385            if value != 0 {
386                // Start acquiring
387                if !state.acquiring {
388                    // If buffers are empty, this is Erase/Start
389                    if state.buffers[0].count() == 0 {
390                        for buf in state.buffers.iter_mut() {
391                            buf.reset();
392                        }
393                    }
394                    state.acquiring = true;
395                    state.start_time = Some(Instant::now());
396                }
397            } else {
398                // Stop
399                state.acquiring = false;
400            }
401            drop(state);
402            self.base.set_int32_param(reason, 0, value)?;
403            self.base.call_param_callbacks(0)?;
404        } else if reason == self.params.ts_read {
405            // Trigger waveform update
406            self.update_waveform_params();
407        } else if reason == self.params.ts_num_points {
408            let new_size = value.max(1) as usize;
409            let mut state = self.shared.lock();
410            state.num_points = new_size;
411            for buf in state.buffers.iter_mut() {
412                buf.resize(new_size);
413            }
414            state.acquiring = false;
415            drop(state);
416
417            // Update time axis
418            let time_axis: Vec<f64> = (0..new_size).map(|i| i as f64).collect();
419            let _ = self
420                .base
421                .params
422                .set_float64_array(self.params.ts_time_axis, 0, time_axis);
423
424            // Re-initialize channel waveforms
425            for i in 0..self.num_channels {
426                let _ = self.base.params.set_float64_array(
427                    self.params.ts_channels[i],
428                    0,
429                    vec![0.0; new_size],
430                );
431            }
432
433            self.base.set_int32_param(reason, 0, value)?;
434            self.base
435                .set_int32_param(self.params.ts_current_point, 0, 0)?;
436            self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
437            self.base.call_param_callbacks(0)?;
438        } else if reason == self.params.ts_acquire_mode {
439            let mode = if value == 0 {
440                TimeSeriesMode::OneShot
441            } else {
442                TimeSeriesMode::RingBuffer
443            };
444            let mut state = self.shared.lock();
445            state.mode = mode;
446            for buf in state.buffers.iter_mut() {
447                buf.set_mode(mode);
448            }
449            state.acquiring = false;
450            drop(state);
451
452            self.base.set_int32_param(reason, 0, value)?;
453            self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
454            self.base.call_param_callbacks(0)?;
455        } else {
456            // Default: store in param cache
457            self.base.set_int32_param(reason, user.addr, value)?;
458            self.base.call_param_callbacks(user.addr)?;
459        }
460
461        Ok(())
462    }
463
464    fn read_float64_array(
465        &mut self,
466        user: &AsynUser,
467        buf: &mut [f64],
468    ) -> asyn_rs::error::AsynResult<usize> {
469        let data = self.base.params.get_float64_array(user.reason, user.addr)?;
470        let n = data.len().min(buf.len());
471        buf[..n].copy_from_slice(&data[..n]);
472        Ok(n)
473    }
474}
475
476/// Background thread that receives data from a plugin and accumulates into shared buffers.
477fn ts_data_thread(shared: Arc<Mutex<SharedTsState>>, mut data_rx: TimeSeriesReceiver) {
478    while let Some(data) = data_rx.blocking_recv() {
479        let mut state = shared.lock();
480        if !state.acquiring {
481            continue;
482        }
483        let n = data.values.len().min(state.buffers.len());
484        for i in 0..n {
485            state.buffers[i].add_value(data.values[i]);
486        }
487        // Auto-stop for OneShot mode
488        if state.mode == TimeSeriesMode::OneShot && state.buffers[0].count() >= state.num_points {
489            state.acquiring = false;
490        }
491    }
492}
493
494/// Create a TS port runtime.
495///
496/// `channel_names` defines the number and names of time series channels.
497/// Returns the port runtime handle, the TS params (for building a registry),
498/// and thread join handles for the actor and data ingestion threads.
499pub fn create_ts_port_runtime(
500    port_name: &str,
501    channel_names: &[&str],
502    num_points: usize,
503    data_rx: TimeSeriesReceiver,
504) -> (
505    PortRuntimeHandle,
506    TSParams,
507    std::thread::JoinHandle<()>,
508    std::thread::JoinHandle<()>,
509) {
510    let num_channels = channel_names.len();
511    let shared = Arc::new(Mutex::new(SharedTsState::new(num_channels, num_points)));
512
513    let driver = TimeSeriesPortDriver::new(port_name, channel_names, num_points, shared.clone());
514
515    // Capture params before the driver is moved into the actor
516    let ts_params = TSParams {
517        ts_acquire: driver.params.ts_acquire,
518        ts_read: driver.params.ts_read,
519        ts_num_points: driver.params.ts_num_points,
520        ts_current_point: driver.params.ts_current_point,
521        ts_time_per_point: driver.params.ts_time_per_point,
522        ts_averaging_time: driver.params.ts_averaging_time,
523        ts_num_average: driver.params.ts_num_average,
524        ts_elapsed_time: driver.params.ts_elapsed_time,
525        ts_acquire_mode: driver.params.ts_acquire_mode,
526        ts_time_axis: driver.params.ts_time_axis,
527        ts_channels: driver.params.ts_channels.clone(),
528        channel_names: driver.params.channel_names.clone(),
529    };
530
531    let (runtime_handle, actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
532
533    // Spawn data ingestion thread
534    let data_jh = std::thread::Builder::new()
535        .name(format!("ts-data-{port_name}"))
536        .spawn(move || {
537            ts_data_thread(shared, data_rx);
538        })
539        .expect("failed to spawn TS data thread");
540
541    (runtime_handle, ts_params, actor_jh, data_jh)
542}
543
544#[cfg(test)]
545mod tests {
546    use super::*;
547
548    #[test]
549    fn test_one_shot() {
550        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
551        for i in 0..5 {
552            ts.add_value(i as f64);
553        }
554        assert_eq!(ts.count(), 5);
555        assert_eq!(ts.values(), vec![0.0, 1.0, 2.0, 3.0, 4.0]);
556
557        // Adding beyond capacity is a no-op
558        ts.add_value(99.0);
559        assert_eq!(ts.count(), 5);
560    }
561
562    #[test]
563    fn test_ring_buffer() {
564        let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
565        for i in 0..6 {
566            ts.add_value(i as f64);
567        }
568        assert_eq!(ts.count(), 4);
569        // Should contain [2, 3, 4, 5] in order
570        assert_eq!(ts.values(), vec![2.0, 3.0, 4.0, 5.0]);
571    }
572
573    #[test]
574    fn test_ring_buffer_partial() {
575        let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
576        ts.add_value(10.0);
577        ts.add_value(20.0);
578        assert_eq!(ts.count(), 2);
579        assert_eq!(ts.values(), vec![10.0, 20.0]);
580    }
581
582    #[test]
583    fn test_reset() {
584        let mut ts = TimeSeries::new(3, TimeSeriesMode::OneShot);
585        ts.add_value(1.0);
586        ts.add_value(2.0);
587        ts.reset();
588        assert_eq!(ts.count(), 0);
589        assert!(ts.values().is_empty());
590    }
591
592    #[test]
593    fn test_resize() {
594        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
595        ts.add_value(1.0);
596        ts.add_value(2.0);
597        ts.resize(3);
598        assert_eq!(ts.num_points, 3);
599        assert_eq!(ts.count(), 0);
600        assert!(ts.values().is_empty());
601    }
602
603    #[test]
604    fn test_set_mode() {
605        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
606        ts.add_value(1.0);
607        ts.set_mode(TimeSeriesMode::RingBuffer);
608        assert_eq!(ts.mode, TimeSeriesMode::RingBuffer);
609        assert_eq!(ts.count(), 0);
610    }
611
612    // --- TS port driver tests (using a small channel set for simplicity) ---
613
614    const TEST_CHANNELS: [&str; 3] = ["ChA", "ChB", "ChC"];
615
616    #[test]
617    fn test_shared_ts_state_init() {
618        let state = SharedTsState::new(3, 100);
619        assert_eq!(state.buffers.len(), 3);
620        assert_eq!(state.num_points, 100);
621        assert!(!state.acquiring);
622        assert_eq!(state.mode, TimeSeriesMode::OneShot);
623    }
624
625    #[test]
626    fn test_ts_port_driver_create() {
627        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
628        let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared);
629        assert_eq!(driver.base().port_name, "TEST_TS");
630        assert_eq!(driver.num_channels, 3);
631        assert!(!driver.base().flags.multi_device);
632    }
633
634    #[test]
635    fn test_ts_port_driver_write_acquire() {
636        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
637        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
638
639        // Start acquiring
640        let mut user = AsynUser::new(driver.params.ts_acquire);
641        driver.write_int32(&mut user, 1).unwrap();
642        assert!(shared.lock().acquiring);
643
644        // Stop acquiring
645        driver.write_int32(&mut user, 0).unwrap();
646        assert!(!shared.lock().acquiring);
647    }
648
649    #[test]
650    fn test_ts_port_driver_write_num_points() {
651        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
652        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
653
654        let mut user = AsynUser::new(driver.params.ts_num_points);
655        driver.write_int32(&mut user, 50).unwrap();
656
657        let state = shared.lock();
658        assert_eq!(state.num_points, 50);
659        for buf in &state.buffers {
660            assert_eq!(buf.num_points, 50);
661        }
662    }
663
664    #[test]
665    fn test_ts_port_driver_write_mode() {
666        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
667        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
668
669        let mut user = AsynUser::new(driver.params.ts_acquire_mode);
670        driver.write_int32(&mut user, 1).unwrap();
671
672        let state = shared.lock();
673        assert_eq!(state.mode, TimeSeriesMode::RingBuffer);
674        for buf in &state.buffers {
675            assert_eq!(buf.mode, TimeSeriesMode::RingBuffer);
676        }
677    }
678
679    #[test]
680    fn test_ts_port_driver_update_waveforms() {
681        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
682        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared.clone());
683
684        // Add some data
685        {
686            let mut state = shared.lock();
687            state.acquiring = true;
688            state.start_time = Some(Instant::now());
689            for buf in state.buffers.iter_mut() {
690                buf.add_value(42.0);
691                buf.add_value(43.0);
692            }
693        }
694
695        // Trigger update
696        driver.update_waveform_params();
697
698        // Check current point was updated
699        let cp = driver
700            .base
701            .get_int32_param(driver.params.ts_current_point, 0)
702            .unwrap();
703        assert_eq!(cp, 2);
704
705        // Check waveform data was written
706        let data = driver
707            .base
708            .params
709            .get_float64_array(driver.params.ts_channels[0], 0)
710            .unwrap();
711        assert_eq!(data[0], 42.0);
712        assert_eq!(data[1], 43.0);
713    }
714
715    #[test]
716    fn test_ts_port_driver_read_array() {
717        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 5)));
718        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 5, shared);
719
720        let user = AsynUser::new(driver.params.ts_time_axis);
721        let mut buf = vec![0.0; 5];
722        let n = driver.read_float64_array(&user, &mut buf).unwrap();
723        assert_eq!(n, 5);
724        assert_eq!(buf, vec![0.0, 1.0, 2.0, 3.0, 4.0]);
725    }
726
727    #[test]
728    fn test_ts_data_ingestion_oneshot() {
729        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 3)));
730        let (tx, rx) = tokio::sync::mpsc::channel(16);
731
732        // Start acquiring
733        shared.lock().acquiring = true;
734
735        let shared_clone = shared.clone();
736        let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
737
738        // Send data
739        tx.blocking_send(TimeSeriesData {
740            values: vec![1.0, 10.0, 100.0],
741        })
742        .unwrap();
743        tx.blocking_send(TimeSeriesData {
744            values: vec![2.0, 20.0, 200.0],
745        })
746        .unwrap();
747        tx.blocking_send(TimeSeriesData {
748            values: vec![3.0, 30.0, 300.0],
749        })
750        .unwrap();
751        tx.blocking_send(TimeSeriesData {
752            values: vec![4.0, 40.0, 400.0],
753        })
754        .unwrap(); // beyond capacity
755
756        // Close channel and wait for thread
757        drop(tx);
758        jh.join().unwrap();
759
760        let state = shared.lock();
761        assert_eq!(state.buffers[0].count(), 3);
762        assert_eq!(state.buffers[0].values(), vec![1.0, 2.0, 3.0]);
763        assert_eq!(state.buffers[1].values(), vec![10.0, 20.0, 30.0]);
764        assert_eq!(state.buffers[2].values(), vec![100.0, 200.0, 300.0]);
765        assert!(!state.acquiring); // auto-stopped
766    }
767
768    #[test]
769    fn test_ts_data_ingestion_not_acquiring() {
770        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
771        let (tx, rx) = tokio::sync::mpsc::channel(16);
772
773        // Not acquiring (default)
774        let shared_clone = shared.clone();
775        let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
776
777        tx.blocking_send(TimeSeriesData {
778            values: vec![1.0, 2.0, 3.0],
779        })
780        .unwrap();
781
782        drop(tx);
783        jh.join().unwrap();
784
785        let state = shared.lock();
786        assert_eq!(state.buffers[0].count(), 0);
787    }
788
789    #[test]
790    fn test_create_ts_port_runtime() {
791        let (_tx, rx) = tokio::sync::mpsc::channel(16);
792        let (handle, params, _actor_jh, _data_jh) =
793            create_ts_port_runtime("TEST_TS_RT", &TEST_CHANNELS, 100, rx);
794        assert_eq!(handle.port_name(), "TEST_TS_RT");
795        assert_eq!(params.ts_channels.len(), 3);
796        handle.shutdown();
797    }
798}