Skip to main content

ad_plugins_rs/
time_series.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use asyn_rs::param::ParamType;
5use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
6use asyn_rs::runtime::config::RuntimeConfig;
7use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
8use asyn_rs::user::AsynUser;
9use parking_lot::Mutex;
10
11// ===== Stats-specific channel definitions =====
12
13/// Number of stats channels in the time series.
14pub const NUM_STATS_TS_CHANNELS: usize = 23;
15
16/// Channel names for the 23 NDStats time series channels.
17pub const STATS_TS_CHANNEL_NAMES: [&str; NUM_STATS_TS_CHANNELS] = [
18    "TSMinValue",
19    "TSMinX",
20    "TSMinY",
21    "TSMaxValue",
22    "TSMaxX",
23    "TSMaxY",
24    "TSMeanValue",
25    "TSSigma",
26    "TSTotal",
27    "TSNet",
28    "TSCentroidTotal",
29    "TSCentroidX",
30    "TSCentroidY",
31    "TSSigmaX",
32    "TSSigmaY",
33    "TSSigmaXY",
34    "TSSkewX",
35    "TSSkewY",
36    "TSKurtosisX",
37    "TSKurtosisY",
38    "TSEccentricity",
39    "TSOrientation",
40    "TSTimestamp",
41];
42
43// ===== Generic time series data =====
44
45/// Shared data pushed from a plugin processor to a TS port driver.
46/// `values` length must match the channel count configured on the driver.
47pub struct TimeSeriesData {
48    pub values: Vec<f64>,
49}
50
51/// Sender from plugin -> TS port.
52pub type TimeSeriesSender = tokio::sync::mpsc::Sender<TimeSeriesData>;
53/// Receiver in TS port background thread.
54pub type TimeSeriesReceiver = tokio::sync::mpsc::Receiver<TimeSeriesData>;
55
56/// Registry for pending TS receivers, keyed by upstream plugin port name.
57/// NDStatsConfigure etc. store receivers here; NDTimeSeriesConfigure picks them up.
58pub struct TsReceiverRegistry {
59    inner: std::sync::Mutex<std::collections::HashMap<String, (TimeSeriesReceiver, Vec<String>)>>,
60}
61
62impl TsReceiverRegistry {
63    pub fn new() -> Self {
64        Self {
65            inner: std::sync::Mutex::new(std::collections::HashMap::new()),
66        }
67    }
68
69    /// Store a receiver and its channel names for a given upstream port.
70    pub fn store(
71        &self,
72        upstream_port: &str,
73        receiver: TimeSeriesReceiver,
74        channel_names: Vec<String>,
75    ) {
76        let mut map = self.inner.lock().unwrap();
77        map.insert(upstream_port.to_string(), (receiver, channel_names));
78    }
79
80    /// Take a receiver for the given upstream port (returns None if not found or already taken).
81    pub fn take(&self, upstream_port: &str) -> Option<(TimeSeriesReceiver, Vec<String>)> {
82        let mut map = self.inner.lock().unwrap();
83        map.remove(upstream_port)
84    }
85}
86
87impl Default for TsReceiverRegistry {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93/// Accumulation mode for time series.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum TimeSeriesMode {
96    OneShot,
97    RingBuffer,
98}
99
100/// Time-series accumulator: stores scalar/1D values from successive arrays.
101pub struct TimeSeries {
102    pub num_points: usize,
103    pub mode: TimeSeriesMode,
104    buffer: Vec<f64>,
105    write_pos: usize,
106    count: usize,
107}
108
109impl TimeSeries {
110    pub fn new(num_points: usize, mode: TimeSeriesMode) -> Self {
111        Self {
112            num_points,
113            mode,
114            buffer: vec![0.0; num_points],
115            write_pos: 0,
116            count: 0,
117        }
118    }
119
120    /// Add a value (e.g., mean of an array) to the time series.
121    pub fn add_value(&mut self, value: f64) {
122        match self.mode {
123            TimeSeriesMode::OneShot => {
124                if self.write_pos < self.num_points {
125                    self.buffer[self.write_pos] = value;
126                    self.write_pos += 1;
127                    self.count = self.write_pos;
128                }
129            }
130            TimeSeriesMode::RingBuffer => {
131                self.buffer[self.write_pos % self.num_points] = value;
132                self.write_pos += 1;
133                self.count = self.count.max(self.write_pos.min(self.num_points));
134            }
135        }
136    }
137
138    /// Get the accumulated values in order.
139    pub fn values(&self) -> Vec<f64> {
140        match self.mode {
141            TimeSeriesMode::OneShot => self.buffer[..self.count].to_vec(),
142            TimeSeriesMode::RingBuffer => {
143                if self.write_pos <= self.num_points {
144                    self.buffer[..self.count].to_vec()
145                } else {
146                    let start = self.write_pos % self.num_points;
147                    let mut result = Vec::with_capacity(self.num_points);
148                    result.extend_from_slice(&self.buffer[start..]);
149                    result.extend_from_slice(&self.buffer[..start]);
150                    result
151                }
152            }
153        }
154    }
155
156    pub fn count(&self) -> usize {
157        self.count
158    }
159
160    pub fn reset(&mut self) {
161        self.buffer.fill(0.0);
162        self.write_pos = 0;
163        self.count = 0;
164    }
165
166    /// Resize the buffer. Resets all data.
167    pub fn resize(&mut self, num_points: usize) {
168        self.num_points = num_points;
169        self.buffer = vec![0.0; num_points];
170        self.write_pos = 0;
171        self.count = 0;
172    }
173
174    /// Change the accumulation mode. Resets all data.
175    pub fn set_mode(&mut self, mode: TimeSeriesMode) {
176        self.mode = mode;
177        self.reset();
178    }
179}
180
181// ===== Time Series Port Driver =====
182
183/// Param indices for the TS port.
184pub struct TSParams {
185    pub ts_acquire: usize,
186    pub ts_read: usize,
187    pub ts_num_points: usize,
188    pub ts_current_point: usize,
189    pub ts_time_per_point: usize,
190    pub ts_averaging_time: usize,
191    pub ts_num_average: usize,
192    pub ts_elapsed_time: usize,
193    pub ts_acquire_mode: usize,
194    pub ts_time_axis: usize,
195    /// Per-channel waveform param indices (length = num_channels).
196    pub ts_channels: Vec<usize>,
197    /// Channel names (kept for registry building).
198    pub channel_names: Vec<String>,
199    /// Generic time series waveform (for NDTimeSeries.template).
200    pub ts_time_series: usize,
201    /// Timestamp waveform (for NDTimeSeries.template).
202    pub ts_timestamp: usize,
203}
204
205/// Shared state between the data ingestion thread and the TS port driver.
206pub struct SharedTsState {
207    pub buffers: Vec<TimeSeries>,
208    pub acquiring: bool,
209    pub start_time: Option<Instant>,
210    pub num_points: usize,
211    pub mode: TimeSeriesMode,
212}
213
214impl SharedTsState {
215    fn new(num_channels: usize, num_points: usize) -> Self {
216        let buffers = (0..num_channels)
217            .map(|_| TimeSeries::new(num_points, TimeSeriesMode::OneShot))
218            .collect();
219        Self {
220            buffers,
221            acquiring: false,
222            start_time: None,
223            num_points,
224            mode: TimeSeriesMode::OneShot,
225        }
226    }
227}
228
229/// TS port driver: standalone asyn PortDriver for time series waveforms.
230///
231/// Generic over the number of channels — Stats uses 23, ROIStat uses
232/// a different set, and NDTimeSeries standalone can use any count.
233pub struct TimeSeriesPortDriver {
234    base: PortDriverBase,
235    params: TSParams,
236    shared: Arc<Mutex<SharedTsState>>,
237    num_channels: usize,
238    time_per_point: f64,
239}
240
241impl TimeSeriesPortDriver {
242    fn new(
243        port_name: &str,
244        channel_names: &[&str],
245        num_points: usize,
246        shared: Arc<Mutex<SharedTsState>>,
247    ) -> Self {
248        let num_channels = channel_names.len();
249        let mut base = PortDriverBase::new(
250            port_name,
251            1,
252            PortFlags {
253                multi_device: false,
254                can_block: false,
255                destructible: true,
256            },
257        );
258
259        // NDPluginBase params (NDTimeSeries.template includes NDPluginBase.template)
260        let _ = ad_core_rs::params::ndarray_driver::NDArrayDriverParams::create(&mut base);
261        let _ = ad_core_rs::plugin::params::PluginBaseParams::create(&mut base);
262
263        // Register control params
264        let ts_acquire = base.create_param("TS_ACQUIRE", ParamType::Int32).unwrap();
265        let _ = base.set_int32_param(ts_acquire, 0, 0);
266        let ts_read = base.create_param("TS_READ", ParamType::Int32).unwrap();
267        let ts_num_points = base
268            .create_param("TS_NUM_POINTS", ParamType::Int32)
269            .unwrap();
270        let _ = base.set_int32_param(ts_num_points, 0, num_points as i32);
271        let ts_current_point = base
272            .create_param("TS_CURRENT_POINT", ParamType::Int32)
273            .unwrap();
274        let _ = base.set_int32_param(ts_current_point, 0, 0);
275        let ts_time_per_point = base
276            .create_param("TS_TIME_PER_POINT", ParamType::Float64)
277            .unwrap();
278        let ts_averaging_time = base
279            .create_param("TS_AVERAGING_TIME", ParamType::Float64)
280            .unwrap();
281        let ts_num_average = base
282            .create_param("TS_NUM_AVERAGE", ParamType::Int32)
283            .unwrap();
284        let _ = base.set_int32_param(ts_num_average, 0, 1);
285        let ts_elapsed_time = base
286            .create_param("TS_ELAPSED_TIME", ParamType::Float64)
287            .unwrap();
288        let ts_acquire_mode = base
289            .create_param("TS_ACQUIRE_MODE", ParamType::Int32)
290            .unwrap();
291        let _ = base.set_int32_param(ts_acquire_mode, 0, 0);
292        let ts_time_axis = base
293            .create_param("TS_TIME_AXIS", ParamType::Float64Array)
294            .unwrap();
295
296        // Initialize time axis (scaled by time_per_point, default 1.0)
297        let time_per_point = 1.0;
298        let time_axis: Vec<f64> = (0..num_points).map(|i| i as f64 * time_per_point).collect();
299        let _ = base.params.set_float64_array(ts_time_axis, 0, time_axis);
300
301        // Channel waveform params — one Float64Array per channel
302        let mut ts_channels = Vec::with_capacity(num_channels);
303        for name in channel_names {
304            let param_name = format!("TS_CHAN_{name}");
305            let idx = base
306                .create_param(&param_name, ParamType::Float64Array)
307                .unwrap();
308            let _ = base.params.set_float64_array(idx, 0, vec![0.0; num_points]);
309            ts_channels.push(idx);
310        }
311
312        // Generic time series and timestamp waveform params
313        let ts_time_series = base
314            .create_param("TS_TIME_SERIES", ParamType::Float64Array)
315            .unwrap();
316        let ts_timestamp = base
317            .create_param("TS_TIMESTAMP", ParamType::Float64Array)
318            .unwrap();
319
320        let params = TSParams {
321            ts_acquire,
322            ts_read,
323            ts_num_points,
324            ts_current_point,
325            ts_time_per_point,
326            ts_averaging_time,
327            ts_num_average,
328            ts_elapsed_time,
329            ts_acquire_mode,
330            ts_time_axis,
331            ts_channels,
332            channel_names: channel_names.iter().map(|s| s.to_string()).collect(),
333            ts_time_series,
334            ts_timestamp,
335        };
336
337        Self {
338            base,
339            params,
340            shared,
341            num_channels,
342            time_per_point,
343        }
344    }
345
346    /// Copy buffer data to Float64Array params and call callbacks.
347    fn update_waveform_params(&mut self) {
348        let state = self.shared.lock();
349        let num_points = state.num_points;
350
351        // Update per-channel waveform params
352        for (i, buf) in state.buffers.iter().enumerate() {
353            let mut values = buf.values();
354            values.resize(num_points, 0.0);
355            let _ = self
356                .base
357                .params
358                .set_float64_array(self.params.ts_channels[i], 0, values);
359        }
360
361        // Update current point
362        let current_point = state.buffers[0].count();
363        let _ = self
364            .base
365            .set_int32_param(self.params.ts_current_point, 0, current_point as i32);
366
367        // Update elapsed time
368        if let Some(start) = state.start_time {
369            let elapsed = start.elapsed().as_secs_f64();
370            let _ = self
371                .base
372                .set_float64_param(self.params.ts_elapsed_time, 0, elapsed);
373        }
374
375        // Update acquire status (may have auto-stopped)
376        let acquiring = state.acquiring;
377        drop(state);
378
379        let _ = self
380            .base
381            .set_int32_param(self.params.ts_acquire, 0, if acquiring { 1 } else { 0 });
382
383        // Notify listeners
384        let _ = self.base.call_param_callbacks(0);
385    }
386}
387
388impl PortDriver for TimeSeriesPortDriver {
389    fn base(&self) -> &PortDriverBase {
390        &self.base
391    }
392
393    fn base_mut(&mut self) -> &mut PortDriverBase {
394        &mut self.base
395    }
396
397    fn write_int32(&mut self, user: &mut AsynUser, value: i32) -> asyn_rs::error::AsynResult<()> {
398        let reason = user.reason;
399
400        if reason == self.params.ts_acquire {
401            let mut state = self.shared.lock();
402            if value != 0 {
403                // Start acquiring
404                if !state.acquiring {
405                    // If buffers are empty, this is Erase/Start
406                    if state.buffers[0].count() == 0 {
407                        for buf in state.buffers.iter_mut() {
408                            buf.reset();
409                        }
410                    }
411                    state.acquiring = true;
412                    state.start_time = Some(Instant::now());
413                }
414            } else {
415                // Stop
416                state.acquiring = false;
417            }
418            drop(state);
419            self.base.set_int32_param(reason, 0, value)?;
420            self.base.call_param_callbacks(0)?;
421        } else if reason == self.params.ts_read {
422            // Trigger waveform update
423            self.update_waveform_params();
424        } else if reason == self.params.ts_num_points {
425            let new_size = value.max(1) as usize;
426            let mut state = self.shared.lock();
427            state.num_points = new_size;
428            for buf in state.buffers.iter_mut() {
429                buf.resize(new_size);
430            }
431            state.acquiring = false;
432            drop(state);
433
434            // Update time axis
435            let time_axis: Vec<f64> = (0..new_size)
436                .map(|i| i as f64 * self.time_per_point)
437                .collect();
438            let _ = self
439                .base
440                .params
441                .set_float64_array(self.params.ts_time_axis, 0, time_axis);
442
443            // Re-initialize channel waveforms
444            for i in 0..self.num_channels {
445                let _ = self.base.params.set_float64_array(
446                    self.params.ts_channels[i],
447                    0,
448                    vec![0.0; new_size],
449                );
450            }
451
452            self.base.set_int32_param(reason, 0, value)?;
453            self.base
454                .set_int32_param(self.params.ts_current_point, 0, 0)?;
455            self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
456            self.base.call_param_callbacks(0)?;
457        } else if reason == self.params.ts_acquire_mode {
458            let mode = if value == 0 {
459                TimeSeriesMode::OneShot
460            } else {
461                TimeSeriesMode::RingBuffer
462            };
463            let mut state = self.shared.lock();
464            state.mode = mode;
465            for buf in state.buffers.iter_mut() {
466                buf.set_mode(mode);
467            }
468            state.acquiring = false;
469            drop(state);
470
471            self.base.set_int32_param(reason, 0, value)?;
472            self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
473            self.base.call_param_callbacks(0)?;
474        } else {
475            // Default: store in param cache
476            self.base.set_int32_param(reason, user.addr, value)?;
477            self.base.call_param_callbacks(user.addr)?;
478        }
479
480        Ok(())
481    }
482
483    fn write_float64(&mut self, user: &mut AsynUser, value: f64) -> asyn_rs::error::AsynResult<()> {
484        let reason = user.reason;
485        if reason == self.params.ts_time_per_point {
486            self.time_per_point = value;
487            self.base.set_float64_param(reason, user.addr, value)?;
488            // Rebuild time axis with new scaling
489            let num_points = self.shared.lock().num_points;
490            let time_axis: Vec<f64> = (0..num_points)
491                .map(|i| i as f64 * self.time_per_point)
492                .collect();
493            let _ = self
494                .base
495                .params
496                .set_float64_array(self.params.ts_time_axis, 0, time_axis);
497            self.base.call_param_callbacks(user.addr)?;
498        } else {
499            self.base.set_float64_param(reason, user.addr, value)?;
500            self.base.call_param_callbacks(user.addr)?;
501        }
502        Ok(())
503    }
504
505    fn read_float64_array(
506        &mut self,
507        user: &AsynUser,
508        buf: &mut [f64],
509    ) -> asyn_rs::error::AsynResult<usize> {
510        let data = self.base.params.get_float64_array(user.reason, user.addr)?;
511        let n = data.len().min(buf.len());
512        buf[..n].copy_from_slice(&data[..n]);
513        Ok(n)
514    }
515}
516
517/// Background thread that receives data from a plugin and accumulates into shared buffers.
518fn ts_data_thread(shared: Arc<Mutex<SharedTsState>>, mut data_rx: TimeSeriesReceiver) {
519    while let Some(data) = data_rx.blocking_recv() {
520        let mut state = shared.lock();
521        if !state.acquiring {
522            continue;
523        }
524        let n = data.values.len().min(state.buffers.len());
525        for i in 0..n {
526            state.buffers[i].add_value(data.values[i]);
527        }
528        // Auto-stop for OneShot mode
529        if state.mode == TimeSeriesMode::OneShot && state.buffers[0].count() >= state.num_points {
530            state.acquiring = false;
531        }
532    }
533}
534
535/// Create a TS port runtime.
536///
537/// `channel_names` defines the number and names of time series channels.
538/// Returns the port runtime handle, the TS params (for building a registry),
539/// and thread join handles for the actor and data ingestion threads.
540pub fn create_ts_port_runtime(
541    port_name: &str,
542    channel_names: &[&str],
543    num_points: usize,
544    data_rx: TimeSeriesReceiver,
545) -> (
546    PortRuntimeHandle,
547    TSParams,
548    std::thread::JoinHandle<()>,
549    std::thread::JoinHandle<()>,
550) {
551    let num_channels = channel_names.len();
552    let shared = Arc::new(Mutex::new(SharedTsState::new(num_channels, num_points)));
553
554    let driver = TimeSeriesPortDriver::new(port_name, channel_names, num_points, shared.clone());
555
556    // Capture params before the driver is moved into the actor
557    let ts_params = TSParams {
558        ts_acquire: driver.params.ts_acquire,
559        ts_read: driver.params.ts_read,
560        ts_num_points: driver.params.ts_num_points,
561        ts_current_point: driver.params.ts_current_point,
562        ts_time_per_point: driver.params.ts_time_per_point,
563        ts_averaging_time: driver.params.ts_averaging_time,
564        ts_num_average: driver.params.ts_num_average,
565        ts_elapsed_time: driver.params.ts_elapsed_time,
566        ts_acquire_mode: driver.params.ts_acquire_mode,
567        ts_time_axis: driver.params.ts_time_axis,
568        ts_channels: driver.params.ts_channels.clone(),
569        channel_names: driver.params.channel_names.clone(),
570        ts_time_series: driver.params.ts_time_series,
571        ts_timestamp: driver.params.ts_timestamp,
572    };
573
574    let (runtime_handle, actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
575
576    // Spawn data ingestion thread
577    let data_jh = std::thread::Builder::new()
578        .name(format!("ts-data-{port_name}"))
579        .spawn(move || {
580            ts_data_thread(shared, data_rx);
581        })
582        .expect("failed to spawn TS data thread");
583
584    (runtime_handle, ts_params, actor_jh, data_jh)
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    #[test]
592    fn test_one_shot() {
593        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
594        for i in 0..5 {
595            ts.add_value(i as f64);
596        }
597        assert_eq!(ts.count(), 5);
598        assert_eq!(ts.values(), vec![0.0, 1.0, 2.0, 3.0, 4.0]);
599
600        // Adding beyond capacity is a no-op
601        ts.add_value(99.0);
602        assert_eq!(ts.count(), 5);
603    }
604
605    #[test]
606    fn test_ring_buffer() {
607        let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
608        for i in 0..6 {
609            ts.add_value(i as f64);
610        }
611        assert_eq!(ts.count(), 4);
612        // Should contain [2, 3, 4, 5] in order
613        assert_eq!(ts.values(), vec![2.0, 3.0, 4.0, 5.0]);
614    }
615
616    #[test]
617    fn test_ring_buffer_partial() {
618        let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
619        ts.add_value(10.0);
620        ts.add_value(20.0);
621        assert_eq!(ts.count(), 2);
622        assert_eq!(ts.values(), vec![10.0, 20.0]);
623    }
624
625    #[test]
626    fn test_reset() {
627        let mut ts = TimeSeries::new(3, TimeSeriesMode::OneShot);
628        ts.add_value(1.0);
629        ts.add_value(2.0);
630        ts.reset();
631        assert_eq!(ts.count(), 0);
632        assert!(ts.values().is_empty());
633    }
634
635    #[test]
636    fn test_resize() {
637        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
638        ts.add_value(1.0);
639        ts.add_value(2.0);
640        ts.resize(3);
641        assert_eq!(ts.num_points, 3);
642        assert_eq!(ts.count(), 0);
643        assert!(ts.values().is_empty());
644    }
645
646    #[test]
647    fn test_set_mode() {
648        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
649        ts.add_value(1.0);
650        ts.set_mode(TimeSeriesMode::RingBuffer);
651        assert_eq!(ts.mode, TimeSeriesMode::RingBuffer);
652        assert_eq!(ts.count(), 0);
653    }
654
655    // --- TS port driver tests (using a small channel set for simplicity) ---
656
657    const TEST_CHANNELS: [&str; 3] = ["ChA", "ChB", "ChC"];
658
659    #[test]
660    fn test_shared_ts_state_init() {
661        let state = SharedTsState::new(3, 100);
662        assert_eq!(state.buffers.len(), 3);
663        assert_eq!(state.num_points, 100);
664        assert!(!state.acquiring);
665        assert_eq!(state.mode, TimeSeriesMode::OneShot);
666    }
667
668    #[test]
669    fn test_ts_port_driver_create() {
670        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
671        let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared);
672        assert_eq!(driver.base().port_name, "TEST_TS");
673        assert_eq!(driver.num_channels, 3);
674        assert!(!driver.base().flags.multi_device);
675    }
676
677    #[test]
678    fn test_ts_port_driver_write_acquire() {
679        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
680        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
681
682        // Start acquiring
683        let mut user = AsynUser::new(driver.params.ts_acquire);
684        driver.write_int32(&mut user, 1).unwrap();
685        assert!(shared.lock().acquiring);
686
687        // Stop acquiring
688        driver.write_int32(&mut user, 0).unwrap();
689        assert!(!shared.lock().acquiring);
690    }
691
692    #[test]
693    fn test_ts_port_driver_write_num_points() {
694        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
695        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
696
697        let mut user = AsynUser::new(driver.params.ts_num_points);
698        driver.write_int32(&mut user, 50).unwrap();
699
700        let state = shared.lock();
701        assert_eq!(state.num_points, 50);
702        for buf in &state.buffers {
703            assert_eq!(buf.num_points, 50);
704        }
705    }
706
707    #[test]
708    fn test_ts_port_driver_write_mode() {
709        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
710        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
711
712        let mut user = AsynUser::new(driver.params.ts_acquire_mode);
713        driver.write_int32(&mut user, 1).unwrap();
714
715        let state = shared.lock();
716        assert_eq!(state.mode, TimeSeriesMode::RingBuffer);
717        for buf in &state.buffers {
718            assert_eq!(buf.mode, TimeSeriesMode::RingBuffer);
719        }
720    }
721
722    #[test]
723    fn test_ts_port_driver_update_waveforms() {
724        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
725        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared.clone());
726
727        // Add some data
728        {
729            let mut state = shared.lock();
730            state.acquiring = true;
731            state.start_time = Some(Instant::now());
732            for buf in state.buffers.iter_mut() {
733                buf.add_value(42.0);
734                buf.add_value(43.0);
735            }
736        }
737
738        // Trigger update
739        driver.update_waveform_params();
740
741        // Check current point was updated
742        let cp = driver
743            .base
744            .get_int32_param(driver.params.ts_current_point, 0)
745            .unwrap();
746        assert_eq!(cp, 2);
747
748        // Check waveform data was written
749        let data = driver
750            .base
751            .params
752            .get_float64_array(driver.params.ts_channels[0], 0)
753            .unwrap();
754        assert_eq!(data[0], 42.0);
755        assert_eq!(data[1], 43.0);
756    }
757
758    #[test]
759    fn test_ts_port_driver_read_array() {
760        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 5)));
761        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 5, shared);
762
763        let user = AsynUser::new(driver.params.ts_time_axis);
764        let mut buf = vec![0.0; 5];
765        let n = driver.read_float64_array(&user, &mut buf).unwrap();
766        assert_eq!(n, 5);
767        assert_eq!(buf, vec![0.0, 1.0, 2.0, 3.0, 4.0]);
768    }
769
770    #[test]
771    fn test_ts_data_ingestion_oneshot() {
772        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 3)));
773        let (tx, rx) = tokio::sync::mpsc::channel(16);
774
775        // Start acquiring
776        shared.lock().acquiring = true;
777
778        let shared_clone = shared.clone();
779        let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
780
781        // Send data
782        tx.blocking_send(TimeSeriesData {
783            values: vec![1.0, 10.0, 100.0],
784        })
785        .unwrap();
786        tx.blocking_send(TimeSeriesData {
787            values: vec![2.0, 20.0, 200.0],
788        })
789        .unwrap();
790        tx.blocking_send(TimeSeriesData {
791            values: vec![3.0, 30.0, 300.0],
792        })
793        .unwrap();
794        tx.blocking_send(TimeSeriesData {
795            values: vec![4.0, 40.0, 400.0],
796        })
797        .unwrap(); // beyond capacity
798
799        // Close channel and wait for thread
800        drop(tx);
801        jh.join().unwrap();
802
803        let state = shared.lock();
804        assert_eq!(state.buffers[0].count(), 3);
805        assert_eq!(state.buffers[0].values(), vec![1.0, 2.0, 3.0]);
806        assert_eq!(state.buffers[1].values(), vec![10.0, 20.0, 30.0]);
807        assert_eq!(state.buffers[2].values(), vec![100.0, 200.0, 300.0]);
808        assert!(!state.acquiring); // auto-stopped
809    }
810
811    #[test]
812    fn test_ts_data_ingestion_not_acquiring() {
813        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
814        let (tx, rx) = tokio::sync::mpsc::channel(16);
815
816        // Not acquiring (default)
817        let shared_clone = shared.clone();
818        let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
819
820        tx.blocking_send(TimeSeriesData {
821            values: vec![1.0, 2.0, 3.0],
822        })
823        .unwrap();
824
825        drop(tx);
826        jh.join().unwrap();
827
828        let state = shared.lock();
829        assert_eq!(state.buffers[0].count(), 0);
830    }
831
832    #[test]
833    fn test_create_ts_port_runtime() {
834        let (_tx, rx) = tokio::sync::mpsc::channel(16);
835        let (handle, params, _actor_jh, _data_jh) =
836            create_ts_port_runtime("TEST_TS_RT", &TEST_CHANNELS, 100, rx);
837        assert_eq!(handle.port_name(), "TEST_TS_RT");
838        assert_eq!(params.ts_channels.len(), 3);
839        handle.shutdown();
840    }
841}