Skip to main content

ad_plugins/
time_series.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use asyn_rs::param::ParamType;
6use asyn_rs::port::{PortDriver, PortDriverBase, PortFlags};
7use asyn_rs::user::AsynUser;
8use asyn_rs::runtime::config::RuntimeConfig;
9use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
10use parking_lot::Mutex;
11
12use ad_core::plugin::registry::{ParamInfo, ParamRegistry};
13
14// ===== Stats-specific channel definitions =====
15
16/// Number of stats channels in the time series.
17pub const NUM_STATS_TS_CHANNELS: usize = 23;
18
19/// Channel names for the 23 NDStats time series channels.
20pub const STATS_TS_CHANNEL_NAMES: [&str; NUM_STATS_TS_CHANNELS] = [
21    "TSMinValue",
22    "TSMinX",
23    "TSMinY",
24    "TSMaxValue",
25    "TSMaxX",
26    "TSMaxY",
27    "TSMeanValue",
28    "TSSigma",
29    "TSTotal",
30    "TSNet",
31    "TSCentroidTotal",
32    "TSCentroidX",
33    "TSCentroidY",
34    "TSSigmaX",
35    "TSSigmaY",
36    "TSSigmaXY",
37    "TSSkewX",
38    "TSSkewY",
39    "TSKurtosisX",
40    "TSKurtosisY",
41    "TSEccentricity",
42    "TSOrientation",
43    "TSTimestamp",
44];
45
46// ===== Generic time series data =====
47
48/// Shared data pushed from a plugin processor to a TS port driver.
49/// `values` length must match the channel count configured on the driver.
50pub struct TimeSeriesData {
51    pub values: Vec<f64>,
52}
53
54/// Sender from plugin -> TS port.
55pub type TimeSeriesSender = tokio::sync::mpsc::Sender<TimeSeriesData>;
56/// Receiver in TS port background thread.
57pub type TimeSeriesReceiver = tokio::sync::mpsc::Receiver<TimeSeriesData>;
58
59/// Accumulation mode for time series.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum TimeSeriesMode {
62    OneShot,
63    RingBuffer,
64}
65
66/// Time-series accumulator: stores scalar/1D values from successive arrays.
67pub struct TimeSeries {
68    pub num_points: usize,
69    pub mode: TimeSeriesMode,
70    buffer: Vec<f64>,
71    write_pos: usize,
72    count: usize,
73}
74
75impl TimeSeries {
76    pub fn new(num_points: usize, mode: TimeSeriesMode) -> Self {
77        Self {
78            num_points,
79            mode,
80            buffer: vec![0.0; num_points],
81            write_pos: 0,
82            count: 0,
83        }
84    }
85
86    /// Add a value (e.g., mean of an array) to the time series.
87    pub fn add_value(&mut self, value: f64) {
88        match self.mode {
89            TimeSeriesMode::OneShot => {
90                if self.write_pos < self.num_points {
91                    self.buffer[self.write_pos] = value;
92                    self.write_pos += 1;
93                    self.count = self.write_pos;
94                }
95            }
96            TimeSeriesMode::RingBuffer => {
97                self.buffer[self.write_pos % self.num_points] = value;
98                self.write_pos += 1;
99                self.count = self.count.max(self.write_pos.min(self.num_points));
100            }
101        }
102    }
103
104    /// Get the accumulated values in order.
105    pub fn values(&self) -> Vec<f64> {
106        match self.mode {
107            TimeSeriesMode::OneShot => self.buffer[..self.count].to_vec(),
108            TimeSeriesMode::RingBuffer => {
109                if self.write_pos <= self.num_points {
110                    self.buffer[..self.count].to_vec()
111                } else {
112                    let start = self.write_pos % self.num_points;
113                    let mut result = Vec::with_capacity(self.num_points);
114                    result.extend_from_slice(&self.buffer[start..]);
115                    result.extend_from_slice(&self.buffer[..start]);
116                    result
117                }
118            }
119        }
120    }
121
122    pub fn count(&self) -> usize {
123        self.count
124    }
125
126    pub fn reset(&mut self) {
127        self.buffer.fill(0.0);
128        self.write_pos = 0;
129        self.count = 0;
130    }
131
132    /// Resize the buffer. Resets all data.
133    pub fn resize(&mut self, num_points: usize) {
134        self.num_points = num_points;
135        self.buffer = vec![0.0; num_points];
136        self.write_pos = 0;
137        self.count = 0;
138    }
139
140    /// Change the accumulation mode. Resets all data.
141    pub fn set_mode(&mut self, mode: TimeSeriesMode) {
142        self.mode = mode;
143        self.reset();
144    }
145}
146
147// ===== Time Series Port Driver =====
148
149/// Param indices for the TS port.
150pub struct TSParams {
151    pub ts_acquire: usize,
152    pub ts_read: usize,
153    pub ts_num_points: usize,
154    pub ts_current_point: usize,
155    pub ts_time_per_point: usize,
156    pub ts_averaging_time: usize,
157    pub ts_num_average: usize,
158    pub ts_elapsed_time: usize,
159    pub ts_acquire_mode: usize,
160    pub ts_time_axis: usize,
161    /// Per-channel waveform param indices (length = num_channels).
162    pub ts_channels: Vec<usize>,
163    /// Channel names (kept for registry building).
164    pub channel_names: Vec<String>,
165}
166
167/// Shared state between the data ingestion thread and the TS port driver.
168pub struct SharedTsState {
169    pub buffers: Vec<TimeSeries>,
170    pub acquiring: bool,
171    pub start_time: Option<Instant>,
172    pub num_points: usize,
173    pub mode: TimeSeriesMode,
174}
175
176impl SharedTsState {
177    fn new(num_channels: usize, num_points: usize) -> Self {
178        let buffers = (0..num_channels)
179            .map(|_| TimeSeries::new(num_points, TimeSeriesMode::OneShot))
180            .collect();
181        Self {
182            buffers,
183            acquiring: false,
184            start_time: None,
185            num_points,
186            mode: TimeSeriesMode::OneShot,
187        }
188    }
189}
190
191/// TS port driver: standalone asyn PortDriver for time series waveforms.
192///
193/// Generic over the number of channels — Stats uses 23, ROIStat uses
194/// a different set, and NDTimeSeries standalone can use any count.
195pub struct TimeSeriesPortDriver {
196    base: PortDriverBase,
197    params: TSParams,
198    shared: Arc<Mutex<SharedTsState>>,
199    num_channels: usize,
200}
201
202impl TimeSeriesPortDriver {
203    fn new(
204        port_name: &str,
205        channel_names: &[&str],
206        num_points: usize,
207        shared: Arc<Mutex<SharedTsState>>,
208    ) -> Self {
209        let num_channels = channel_names.len();
210        let mut base = PortDriverBase::new(port_name, 1, PortFlags {
211            multi_device: false,
212            can_block: false,
213            destructible: true,
214        });
215
216        // Register control params
217        let ts_acquire = base.create_param("TS_ACQUIRE", ParamType::Int32).unwrap();
218        let _ = base.set_int32_param(ts_acquire, 0, 0);
219        let ts_read = base.create_param("TS_READ", ParamType::Int32).unwrap();
220        let ts_num_points = base.create_param("TS_NUM_POINTS", ParamType::Int32).unwrap();
221        let _ = base.set_int32_param(ts_num_points, 0, num_points as i32);
222        let ts_current_point = base.create_param("TS_CURRENT_POINT", ParamType::Int32).unwrap();
223        let _ = base.set_int32_param(ts_current_point, 0, 0);
224        let ts_time_per_point = base.create_param("TS_TIME_PER_POINT", ParamType::Float64).unwrap();
225        let ts_averaging_time = base.create_param("TS_AVERAGING_TIME", ParamType::Float64).unwrap();
226        let ts_num_average = base.create_param("TS_NUM_AVERAGE", ParamType::Int32).unwrap();
227        let _ = base.set_int32_param(ts_num_average, 0, 1);
228        let ts_elapsed_time = base.create_param("TS_ELAPSED_TIME", ParamType::Float64).unwrap();
229        let ts_acquire_mode = base.create_param("TS_ACQUIRE_MODE", ParamType::Int32).unwrap();
230        let _ = base.set_int32_param(ts_acquire_mode, 0, 0);
231        let ts_time_axis = base.create_param("TS_TIME_AXIS", ParamType::Float64Array).unwrap();
232
233        // Initialize time axis
234        let time_axis: Vec<f64> = (0..num_points).map(|i| i as f64).collect();
235        let _ = base.params.set_float64_array(ts_time_axis, 0, time_axis);
236
237        // Channel waveform params — one Float64Array per channel
238        let mut ts_channels = Vec::with_capacity(num_channels);
239        for name in channel_names {
240            let param_name = format!("TS_CHAN_{name}");
241            let idx = base.create_param(&param_name, ParamType::Float64Array).unwrap();
242            let _ = base.params.set_float64_array(idx, 0, vec![0.0; num_points]);
243            ts_channels.push(idx);
244        }
245
246        let params = TSParams {
247            ts_acquire,
248            ts_read,
249            ts_num_points,
250            ts_current_point,
251            ts_time_per_point,
252            ts_averaging_time,
253            ts_num_average,
254            ts_elapsed_time,
255            ts_acquire_mode,
256            ts_time_axis,
257            ts_channels,
258            channel_names: channel_names.iter().map(|s| s.to_string()).collect(),
259        };
260
261        Self { base, params, shared, num_channels }
262    }
263
264    /// Copy buffer data to Float64Array params and call callbacks.
265    fn update_waveform_params(&mut self) {
266        let state = self.shared.lock();
267        let num_points = state.num_points;
268
269        // Update per-channel waveform params
270        for (i, buf) in state.buffers.iter().enumerate() {
271            let mut values = buf.values();
272            values.resize(num_points, 0.0);
273            let _ = self.base.params.set_float64_array(
274                self.params.ts_channels[i], 0, values,
275            );
276        }
277
278        // Update current point
279        let current_point = state.buffers[0].count();
280        let _ = self.base.set_int32_param(self.params.ts_current_point, 0, current_point as i32);
281
282        // Update elapsed time
283        if let Some(start) = state.start_time {
284            let elapsed = start.elapsed().as_secs_f64();
285            let _ = self.base.set_float64_param(self.params.ts_elapsed_time, 0, elapsed);
286        }
287
288        // Update acquire status (may have auto-stopped)
289        let acquiring = state.acquiring;
290        drop(state);
291
292        let _ = self.base.set_int32_param(self.params.ts_acquire, 0, if acquiring { 1 } else { 0 });
293
294        // Notify listeners
295        let _ = self.base.call_param_callbacks(0);
296    }
297}
298
299impl PortDriver for TimeSeriesPortDriver {
300    fn base(&self) -> &PortDriverBase {
301        &self.base
302    }
303
304    fn base_mut(&mut self) -> &mut PortDriverBase {
305        &mut self.base
306    }
307
308    fn write_int32(&mut self, user: &mut AsynUser, value: i32) -> asyn_rs::error::AsynResult<()> {
309        let reason = user.reason;
310
311        if reason == self.params.ts_acquire {
312            let mut state = self.shared.lock();
313            if value != 0 {
314                // Start acquiring
315                if !state.acquiring {
316                    // If buffers are empty, this is Erase/Start
317                    if state.buffers[0].count() == 0 {
318                        for buf in state.buffers.iter_mut() {
319                            buf.reset();
320                        }
321                    }
322                    state.acquiring = true;
323                    state.start_time = Some(Instant::now());
324                }
325            } else {
326                // Stop
327                state.acquiring = false;
328            }
329            drop(state);
330            self.base.set_int32_param(reason, 0, value)?;
331            self.base.call_param_callbacks(0)?;
332        } else if reason == self.params.ts_read {
333            // Trigger waveform update
334            self.update_waveform_params();
335        } else if reason == self.params.ts_num_points {
336            let new_size = value.max(1) as usize;
337            let mut state = self.shared.lock();
338            state.num_points = new_size;
339            for buf in state.buffers.iter_mut() {
340                buf.resize(new_size);
341            }
342            state.acquiring = false;
343            drop(state);
344
345            // Update time axis
346            let time_axis: Vec<f64> = (0..new_size).map(|i| i as f64).collect();
347            let _ = self.base.params.set_float64_array(self.params.ts_time_axis, 0, time_axis);
348
349            // Re-initialize channel waveforms
350            for i in 0..self.num_channels {
351                let _ = self.base.params.set_float64_array(
352                    self.params.ts_channels[i], 0, vec![0.0; new_size],
353                );
354            }
355
356            self.base.set_int32_param(reason, 0, value)?;
357            self.base.set_int32_param(self.params.ts_current_point, 0, 0)?;
358            self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
359            self.base.call_param_callbacks(0)?;
360        } else if reason == self.params.ts_acquire_mode {
361            let mode = if value == 0 {
362                TimeSeriesMode::OneShot
363            } else {
364                TimeSeriesMode::RingBuffer
365            };
366            let mut state = self.shared.lock();
367            state.mode = mode;
368            for buf in state.buffers.iter_mut() {
369                buf.set_mode(mode);
370            }
371            state.acquiring = false;
372            drop(state);
373
374            self.base.set_int32_param(reason, 0, value)?;
375            self.base.set_int32_param(self.params.ts_acquire, 0, 0)?;
376            self.base.call_param_callbacks(0)?;
377        } else {
378            // Default: store in param cache
379            self.base.set_int32_param(reason, user.addr, value)?;
380            self.base.call_param_callbacks(user.addr)?;
381        }
382
383        Ok(())
384    }
385
386    fn read_float64_array(
387        &mut self,
388        user: &AsynUser,
389        buf: &mut [f64],
390    ) -> asyn_rs::error::AsynResult<usize> {
391        let data = self.base.params.get_float64_array(user.reason, user.addr)?;
392        let n = data.len().min(buf.len());
393        buf[..n].copy_from_slice(&data[..n]);
394        Ok(n)
395    }
396}
397
398/// Background thread that receives data from a plugin and accumulates into shared buffers.
399fn ts_data_thread(
400    shared: Arc<Mutex<SharedTsState>>,
401    mut data_rx: TimeSeriesReceiver,
402) {
403    while let Some(data) = data_rx.blocking_recv() {
404        let mut state = shared.lock();
405        if !state.acquiring {
406            continue;
407        }
408        let n = data.values.len().min(state.buffers.len());
409        for i in 0..n {
410            state.buffers[i].add_value(data.values[i]);
411        }
412        // Auto-stop for OneShot mode
413        if state.mode == TimeSeriesMode::OneShot
414            && state.buffers[0].count() >= state.num_points
415        {
416            state.acquiring = false;
417        }
418    }
419}
420
421/// Create a TS port runtime.
422///
423/// `channel_names` defines the number and names of time series channels.
424/// Returns the port runtime handle, the TS params (for building a registry),
425/// and thread join handles for the actor and data ingestion threads.
426pub fn create_ts_port_runtime(
427    port_name: &str,
428    channel_names: &[&str],
429    num_points: usize,
430    data_rx: TimeSeriesReceiver,
431) -> (PortRuntimeHandle, TSParams, std::thread::JoinHandle<()>, std::thread::JoinHandle<()>) {
432    let num_channels = channel_names.len();
433    let shared = Arc::new(Mutex::new(SharedTsState::new(num_channels, num_points)));
434
435    let driver = TimeSeriesPortDriver::new(port_name, channel_names, num_points, shared.clone());
436
437    // Capture params before the driver is moved into the actor
438    let ts_params = TSParams {
439        ts_acquire: driver.params.ts_acquire,
440        ts_read: driver.params.ts_read,
441        ts_num_points: driver.params.ts_num_points,
442        ts_current_point: driver.params.ts_current_point,
443        ts_time_per_point: driver.params.ts_time_per_point,
444        ts_averaging_time: driver.params.ts_averaging_time,
445        ts_num_average: driver.params.ts_num_average,
446        ts_elapsed_time: driver.params.ts_elapsed_time,
447        ts_acquire_mode: driver.params.ts_acquire_mode,
448        ts_time_axis: driver.params.ts_time_axis,
449        ts_channels: driver.params.ts_channels.clone(),
450        channel_names: driver.params.channel_names.clone(),
451    };
452
453    let (runtime_handle, actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
454
455    // Spawn data ingestion thread
456    let data_jh = std::thread::Builder::new()
457        .name(format!("ts-data-{port_name}"))
458        .spawn(move || {
459            ts_data_thread(shared, data_rx);
460        })
461        .expect("failed to spawn TS data thread");
462
463    (runtime_handle, ts_params, actor_jh, data_jh)
464}
465
466/// Build a parameter registry for the TS port.
467///
468/// The channel names from `TSParams` are used as record suffix keys.
469pub fn build_ts_registry(tp: &TSParams) -> ParamRegistry {
470    let mut map: ParamRegistry = HashMap::new();
471
472    // Control params
473    map.insert("TSAcquire".into(), ParamInfo::int32(tp.ts_acquire, "TS_ACQUIRE"));
474    map.insert("TSAcquiring".into(), ParamInfo::int32(tp.ts_acquire, "TS_ACQUIRE"));
475    map.insert("TSRead".into(), ParamInfo::int32(tp.ts_read, "TS_READ"));
476    map.insert("TSNumPoints".into(), ParamInfo::int32(tp.ts_num_points, "TS_NUM_POINTS"));
477    map.insert("TSNumPoints_RBV".into(), ParamInfo::int32(tp.ts_num_points, "TS_NUM_POINTS"));
478    map.insert("TSCurrentPoint".into(), ParamInfo::int32(tp.ts_current_point, "TS_CURRENT_POINT"));
479    map.insert("TSTimePerPoint".into(), ParamInfo::float64(tp.ts_time_per_point, "TS_TIME_PER_POINT"));
480    map.insert("TSTimePerPoint_RBV".into(), ParamInfo::float64(tp.ts_time_per_point, "TS_TIME_PER_POINT"));
481    map.insert("TSAveragingTime".into(), ParamInfo::float64(tp.ts_averaging_time, "TS_AVERAGING_TIME"));
482    map.insert("TSAveragingTime_RBV".into(), ParamInfo::float64(tp.ts_averaging_time, "TS_AVERAGING_TIME"));
483    map.insert("TSNumAverage".into(), ParamInfo::int32(tp.ts_num_average, "TS_NUM_AVERAGE"));
484    map.insert("TSElapsedTime".into(), ParamInfo::float64(tp.ts_elapsed_time, "TS_ELAPSED_TIME"));
485    map.insert("TSAcquireMode".into(), ParamInfo::int32(tp.ts_acquire_mode, "TS_ACQUIRE_MODE"));
486    map.insert("TSAcquireMode_RBV".into(), ParamInfo::int32(tp.ts_acquire_mode, "TS_ACQUIRE_MODE"));
487
488    // Time axis waveform
489    map.insert("TSTimeAxis".into(), ParamInfo::float64_array(tp.ts_time_axis, "TS_TIME_AXIS"));
490
491    // Per-channel waveforms (names come from the channel_names passed at creation)
492    for (i, name) in tp.channel_names.iter().enumerate() {
493        let drv_info = format!("TS_CHAN_{name}");
494        map.insert(name.clone(), ParamInfo::float64_array(tp.ts_channels[i], &drv_info));
495    }
496
497    map
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn test_one_shot() {
506        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
507        for i in 0..5 {
508            ts.add_value(i as f64);
509        }
510        assert_eq!(ts.count(), 5);
511        assert_eq!(ts.values(), vec![0.0, 1.0, 2.0, 3.0, 4.0]);
512
513        // Adding beyond capacity is a no-op
514        ts.add_value(99.0);
515        assert_eq!(ts.count(), 5);
516    }
517
518    #[test]
519    fn test_ring_buffer() {
520        let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
521        for i in 0..6 {
522            ts.add_value(i as f64);
523        }
524        assert_eq!(ts.count(), 4);
525        // Should contain [2, 3, 4, 5] in order
526        assert_eq!(ts.values(), vec![2.0, 3.0, 4.0, 5.0]);
527    }
528
529    #[test]
530    fn test_ring_buffer_partial() {
531        let mut ts = TimeSeries::new(4, TimeSeriesMode::RingBuffer);
532        ts.add_value(10.0);
533        ts.add_value(20.0);
534        assert_eq!(ts.count(), 2);
535        assert_eq!(ts.values(), vec![10.0, 20.0]);
536    }
537
538    #[test]
539    fn test_reset() {
540        let mut ts = TimeSeries::new(3, TimeSeriesMode::OneShot);
541        ts.add_value(1.0);
542        ts.add_value(2.0);
543        ts.reset();
544        assert_eq!(ts.count(), 0);
545        assert!(ts.values().is_empty());
546    }
547
548    #[test]
549    fn test_resize() {
550        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
551        ts.add_value(1.0);
552        ts.add_value(2.0);
553        ts.resize(3);
554        assert_eq!(ts.num_points, 3);
555        assert_eq!(ts.count(), 0);
556        assert!(ts.values().is_empty());
557    }
558
559    #[test]
560    fn test_set_mode() {
561        let mut ts = TimeSeries::new(5, TimeSeriesMode::OneShot);
562        ts.add_value(1.0);
563        ts.set_mode(TimeSeriesMode::RingBuffer);
564        assert_eq!(ts.mode, TimeSeriesMode::RingBuffer);
565        assert_eq!(ts.count(), 0);
566    }
567
568    // --- TS port driver tests (using a small channel set for simplicity) ---
569
570    const TEST_CHANNELS: [&str; 3] = ["ChA", "ChB", "ChC"];
571
572    #[test]
573    fn test_shared_ts_state_init() {
574        let state = SharedTsState::new(3, 100);
575        assert_eq!(state.buffers.len(), 3);
576        assert_eq!(state.num_points, 100);
577        assert!(!state.acquiring);
578        assert_eq!(state.mode, TimeSeriesMode::OneShot);
579    }
580
581    #[test]
582    fn test_ts_port_driver_create() {
583        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
584        let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared);
585        assert_eq!(driver.base().port_name, "TEST_TS");
586        assert_eq!(driver.num_channels, 3);
587        assert!(!driver.base().flags.multi_device);
588    }
589
590    #[test]
591    fn test_ts_port_driver_write_acquire() {
592        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
593        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
594
595        // Start acquiring
596        let mut user = AsynUser::new(driver.params.ts_acquire);
597        driver.write_int32(&mut user, 1).unwrap();
598        assert!(shared.lock().acquiring);
599
600        // Stop acquiring
601        driver.write_int32(&mut user, 0).unwrap();
602        assert!(!shared.lock().acquiring);
603    }
604
605    #[test]
606    fn test_ts_port_driver_write_num_points() {
607        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
608        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
609
610        let mut user = AsynUser::new(driver.params.ts_num_points);
611        driver.write_int32(&mut user, 50).unwrap();
612
613        let state = shared.lock();
614        assert_eq!(state.num_points, 50);
615        for buf in &state.buffers {
616            assert_eq!(buf.num_points, 50);
617        }
618    }
619
620    #[test]
621    fn test_ts_port_driver_write_mode() {
622        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 100)));
623        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 100, shared.clone());
624
625        let mut user = AsynUser::new(driver.params.ts_acquire_mode);
626        driver.write_int32(&mut user, 1).unwrap();
627
628        let state = shared.lock();
629        assert_eq!(state.mode, TimeSeriesMode::RingBuffer);
630        for buf in &state.buffers {
631            assert_eq!(buf.mode, TimeSeriesMode::RingBuffer);
632        }
633    }
634
635    #[test]
636    fn test_ts_port_driver_update_waveforms() {
637        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
638        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared.clone());
639
640        // Add some data
641        {
642            let mut state = shared.lock();
643            state.acquiring = true;
644            state.start_time = Some(Instant::now());
645            for buf in state.buffers.iter_mut() {
646                buf.add_value(42.0);
647                buf.add_value(43.0);
648            }
649        }
650
651        // Trigger update
652        driver.update_waveform_params();
653
654        // Check current point was updated
655        let cp = driver.base.get_int32_param(driver.params.ts_current_point, 0).unwrap();
656        assert_eq!(cp, 2);
657
658        // Check waveform data was written
659        let data = driver.base.params.get_float64_array(driver.params.ts_channels[0], 0).unwrap();
660        assert_eq!(data[0], 42.0);
661        assert_eq!(data[1], 43.0);
662    }
663
664    #[test]
665    fn test_ts_port_driver_read_array() {
666        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 5)));
667        let mut driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 5, shared);
668
669        let user = AsynUser::new(driver.params.ts_time_axis);
670        let mut buf = vec![0.0; 5];
671        let n = driver.read_float64_array(&user, &mut buf).unwrap();
672        assert_eq!(n, 5);
673        assert_eq!(buf, vec![0.0, 1.0, 2.0, 3.0, 4.0]);
674    }
675
676    #[test]
677    fn test_ts_data_ingestion_oneshot() {
678        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 3)));
679        let (tx, rx) = tokio::sync::mpsc::channel(16);
680
681        // Start acquiring
682        shared.lock().acquiring = true;
683
684        let shared_clone = shared.clone();
685        let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
686
687        // Send data
688        tx.blocking_send(TimeSeriesData { values: vec![1.0, 10.0, 100.0] }).unwrap();
689        tx.blocking_send(TimeSeriesData { values: vec![2.0, 20.0, 200.0] }).unwrap();
690        tx.blocking_send(TimeSeriesData { values: vec![3.0, 30.0, 300.0] }).unwrap();
691        tx.blocking_send(TimeSeriesData { values: vec![4.0, 40.0, 400.0] }).unwrap(); // beyond capacity
692
693        // Close channel and wait for thread
694        drop(tx);
695        jh.join().unwrap();
696
697        let state = shared.lock();
698        assert_eq!(state.buffers[0].count(), 3);
699        assert_eq!(state.buffers[0].values(), vec![1.0, 2.0, 3.0]);
700        assert_eq!(state.buffers[1].values(), vec![10.0, 20.0, 30.0]);
701        assert_eq!(state.buffers[2].values(), vec![100.0, 200.0, 300.0]);
702        assert!(!state.acquiring); // auto-stopped
703    }
704
705    #[test]
706    fn test_ts_data_ingestion_not_acquiring() {
707        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
708        let (tx, rx) = tokio::sync::mpsc::channel(16);
709
710        // Not acquiring (default)
711        let shared_clone = shared.clone();
712        let jh = std::thread::spawn(move || ts_data_thread(shared_clone, rx));
713
714        tx.blocking_send(TimeSeriesData { values: vec![1.0, 2.0, 3.0] }).unwrap();
715
716        drop(tx);
717        jh.join().unwrap();
718
719        let state = shared.lock();
720        assert_eq!(state.buffers[0].count(), 0);
721    }
722
723    #[test]
724    fn test_build_ts_registry() {
725        let shared = Arc::new(Mutex::new(SharedTsState::new(3, 10)));
726        let driver = TimeSeriesPortDriver::new("TEST_TS", &TEST_CHANNELS, 10, shared);
727        let registry = build_ts_registry(&driver.params);
728
729        // Check control params
730        assert!(registry.contains_key("TSAcquire"));
731        assert!(registry.contains_key("TSAcquiring"));
732        assert!(registry.contains_key("TSRead"));
733        assert!(registry.contains_key("TSNumPoints"));
734        assert!(registry.contains_key("TSCurrentPoint"));
735        assert!(registry.contains_key("TSAcquireMode"));
736
737        // Check channel waveforms
738        assert!(registry.contains_key("ChA"));
739        assert!(registry.contains_key("ChB"));
740        assert!(registry.contains_key("ChC"));
741        assert!(registry.contains_key("TSTimeAxis"));
742
743        // 14 control + 1 time axis + 3 channels = 18
744        assert_eq!(registry.len(), 18);
745    }
746
747    #[test]
748    fn test_build_ts_registry_stats_channels() {
749        let shared = Arc::new(Mutex::new(SharedTsState::new(NUM_STATS_TS_CHANNELS, 10)));
750        let driver = TimeSeriesPortDriver::new("STATS_TS", &STATS_TS_CHANNEL_NAMES, 10, shared);
751        let registry = build_ts_registry(&driver.params);
752
753        // 14 control + 1 time axis + 23 channels = 38
754        assert_eq!(registry.len(), 38);
755        assert!(registry.contains_key("TSMinValue"));
756        assert!(registry.contains_key("TSTimestamp"));
757    }
758
759    #[test]
760    fn test_create_ts_port_runtime() {
761        let (_tx, rx) = tokio::sync::mpsc::channel(16);
762        let (handle, params, _actor_jh, _data_jh) =
763            create_ts_port_runtime("TEST_TS_RT", &TEST_CHANNELS, 100, rx);
764        assert_eq!(handle.port_name(), "TEST_TS_RT");
765        assert_eq!(params.ts_channels.len(), 3);
766        handle.shutdown();
767    }
768}