Skip to main content

hdds_recording/
player.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! DDS message replay/playback.
5//!
6//! Reads recorded messages and publishes them with timing control.
7
8use crate::filter::TopicFilter;
9use crate::format::{HddsReader, Message};
10use std::path::{Path, PathBuf};
11use std::time::{Duration, Instant};
12use thiserror::Error;
13
14/// Playback speed control.
15#[derive(Debug, Clone, Copy, Default)]
16pub enum PlaybackSpeed {
17    /// Real-time playback (1.0x).
18    #[default]
19    Realtime,
20    /// Fixed speed multiplier (e.g., 2.0 = 2x faster).
21    Speed(f64),
22    /// As fast as possible (no timing).
23    Unlimited,
24}
25
26impl PlaybackSpeed {
27    /// Get the speed multiplier (Unlimited returns f64::INFINITY).
28    pub fn multiplier(&self) -> f64 {
29        match self {
30            Self::Realtime => 1.0,
31            Self::Speed(s) => *s,
32            Self::Unlimited => f64::INFINITY,
33        }
34    }
35
36    /// Calculate delay for a given timestamp delta.
37    pub fn delay_for(&self, delta_nanos: u64) -> Option<Duration> {
38        match self {
39            Self::Unlimited => None,
40            Self::Realtime => Some(Duration::from_nanos(delta_nanos)),
41            Self::Speed(s) => {
42                if *s <= 0.0 {
43                    None
44                } else {
45                    Some(Duration::from_nanos((delta_nanos as f64 / s) as u64))
46                }
47            }
48        }
49    }
50}
51
52/// Player configuration.
53#[derive(Debug, Clone)]
54pub struct PlayerConfig {
55    /// Input file path.
56    pub input_path: PathBuf,
57
58    /// Playback speed.
59    pub speed: PlaybackSpeed,
60
61    /// Topic filter (None = all topics).
62    pub topic_filter: Option<TopicFilter>,
63
64    /// Loop playback.
65    pub loop_playback: bool,
66
67    /// Start offset (skip first N nanoseconds).
68    pub start_offset_nanos: u64,
69
70    /// End time (stop after N nanoseconds, 0 = play all).
71    pub end_time_nanos: u64,
72}
73
74impl PlayerConfig {
75    /// Create a new player config.
76    pub fn new<P: AsRef<Path>>(input_path: P) -> Self {
77        Self {
78            input_path: input_path.as_ref().to_path_buf(),
79            speed: PlaybackSpeed::Realtime,
80            topic_filter: None,
81            loop_playback: false,
82            start_offset_nanos: 0,
83            end_time_nanos: 0,
84        }
85    }
86
87    /// Set playback speed.
88    pub fn speed(mut self, speed: PlaybackSpeed) -> Self {
89        self.speed = speed;
90        self
91    }
92
93    /// Set speed as multiplier.
94    pub fn speed_multiplier(mut self, multiplier: f64) -> Self {
95        self.speed = if multiplier <= 0.0 {
96            PlaybackSpeed::Unlimited
97        } else if (multiplier - 1.0).abs() < 0.001 {
98            PlaybackSpeed::Realtime
99        } else {
100            PlaybackSpeed::Speed(multiplier)
101        };
102        self
103    }
104
105    /// Set topic filter.
106    pub fn topic_filter(mut self, filter: TopicFilter) -> Self {
107        self.topic_filter = Some(filter);
108        self
109    }
110
111    /// Enable loop playback.
112    pub fn loop_playback(mut self, enable: bool) -> Self {
113        self.loop_playback = enable;
114        self
115    }
116
117    /// Set start offset.
118    pub fn start_offset(mut self, offset: Duration) -> Self {
119        self.start_offset_nanos = offset.as_nanos() as u64;
120        self
121    }
122
123    /// Set end time.
124    pub fn end_time(mut self, end: Duration) -> Self {
125        self.end_time_nanos = end.as_nanos() as u64;
126        self
127    }
128}
129
130/// Player errors.
131#[derive(Debug, Error)]
132pub enum PlayerError {
133    #[error("I/O error: {0}")]
134    Io(#[from] std::io::Error),
135
136    #[error("Format error: {0}")]
137    Format(#[from] crate::format::FormatError),
138
139    #[error("File not found: {0}")]
140    FileNotFound(PathBuf),
141
142    #[error("Playback cancelled")]
143    Cancelled,
144}
145
146/// Playback statistics.
147#[derive(Debug, Clone, Default)]
148pub struct PlaybackStats {
149    /// Total messages played.
150    pub messages_played: u64,
151
152    /// Total messages skipped (filtered).
153    pub messages_skipped: u64,
154
155    /// Playback duration in seconds.
156    pub duration_secs: f64,
157
158    /// Actual messages per second.
159    pub messages_per_second: f64,
160
161    /// Recording duration in seconds.
162    pub recording_duration_secs: f64,
163
164    /// Number of loops completed.
165    pub loops_completed: u32,
166}
167
168/// DDS message player.
169pub struct Player {
170    config: PlayerConfig,
171    reader: Option<HddsReader>,
172    last_timestamp: u64,
173    playback_start: Option<Instant>,
174    stats: PlaybackStats,
175    cancelled: bool,
176}
177
178impl Player {
179    /// Create a new player.
180    pub fn new(config: PlayerConfig) -> Self {
181        Self {
182            config,
183            reader: None,
184            last_timestamp: 0,
185            playback_start: None,
186            stats: PlaybackStats::default(),
187            cancelled: false,
188        }
189    }
190
191    /// Open the recording file.
192    pub fn open(&mut self) -> Result<(), PlayerError> {
193        if !self.config.input_path.exists() {
194            return Err(PlayerError::FileNotFound(self.config.input_path.clone()));
195        }
196
197        let reader = HddsReader::open(&self.config.input_path)?;
198
199        self.stats.recording_duration_secs = reader.duration_nanos() as f64 / 1_000_000_000.0;
200        self.reader = Some(reader);
201        self.last_timestamp = 0;
202        self.playback_start = Some(Instant::now());
203
204        tracing::info!(
205            "Opened {} ({} messages, {:.1}s)",
206            self.config.input_path.display(),
207            self.reader.as_ref().map(|r| r.message_count()).unwrap_or(0),
208            self.stats.recording_duration_secs
209        );
210
211        Ok(())
212    }
213
214    /// Get the next message to play.
215    ///
216    /// Returns `Ok(None)` when playback is complete.
217    /// Handles timing based on playback speed.
218    pub fn next_message(&mut self) -> Result<Option<Message>, PlayerError> {
219        if self.cancelled {
220            return Err(PlayerError::Cancelled);
221        }
222
223        loop {
224            let reader = match &mut self.reader {
225                Some(r) => r,
226                None => return Ok(None),
227            };
228
229            match reader.read_message()? {
230                Some(msg) => {
231                    // Apply time filters
232                    if msg.timestamp_nanos < self.config.start_offset_nanos {
233                        self.stats.messages_skipped += 1;
234                        continue;
235                    }
236
237                    if self.config.end_time_nanos > 0
238                        && msg.timestamp_nanos > self.config.end_time_nanos
239                    {
240                        // End time reached
241                        if self.config.loop_playback {
242                            self.restart()?;
243                            continue;
244                        }
245                        return Ok(None);
246                    }
247
248                    // Apply topic filter
249                    if let Some(ref filter) = self.config.topic_filter {
250                        if !filter.matches(&msg.topic_name) {
251                            self.stats.messages_skipped += 1;
252                            continue;
253                        }
254                    }
255
256                    // Apply timing
257                    if msg.timestamp_nanos > self.last_timestamp {
258                        let delta = msg.timestamp_nanos - self.last_timestamp;
259                        if let Some(delay) = self.config.speed.delay_for(delta) {
260                            std::thread::sleep(delay);
261                        }
262                    }
263
264                    self.last_timestamp = msg.timestamp_nanos;
265                    self.stats.messages_played += 1;
266
267                    return Ok(Some(msg));
268                }
269                None => {
270                    // End of file
271                    if self.config.loop_playback {
272                        self.restart()?;
273                        continue;
274                    }
275
276                    // Update final stats
277                    if let Some(start) = self.playback_start {
278                        self.stats.duration_secs = start.elapsed().as_secs_f64();
279                        if self.stats.duration_secs > 0.0 {
280                            self.stats.messages_per_second =
281                                self.stats.messages_played as f64 / self.stats.duration_secs;
282                        }
283                    }
284
285                    return Ok(None);
286                }
287            }
288        }
289    }
290
291    /// Restart playback from beginning.
292    fn restart(&mut self) -> Result<(), PlayerError> {
293        self.reader = None;
294        let reader = HddsReader::open(&self.config.input_path)?;
295        self.reader = Some(reader);
296        self.last_timestamp = 0;
297        self.stats.loops_completed += 1;
298
299        tracing::debug!("Restarting playback (loop {})", self.stats.loops_completed);
300
301        Ok(())
302    }
303
304    /// Cancel playback.
305    pub fn cancel(&mut self) {
306        self.cancelled = true;
307    }
308
309    /// Check if playback is complete.
310    pub fn is_complete(&self) -> bool {
311        self.reader.is_none() || self.cancelled
312    }
313
314    /// Get playback statistics.
315    pub fn stats(&self) -> &PlaybackStats {
316        &self.stats
317    }
318
319    /// Get recording metadata.
320    pub fn metadata(&self) -> Option<&crate::format::RecordingMetadata> {
321        self.reader.as_ref().map(|r| r.metadata())
322    }
323
324    /// Get total message count in recording.
325    pub fn total_messages(&self) -> u64 {
326        self.reader.as_ref().map(|r| r.message_count()).unwrap_or(0)
327    }
328
329    /// Get configuration.
330    pub fn config(&self) -> &PlayerConfig {
331        &self.config
332    }
333
334    /// Iterate over all messages.
335    pub fn messages(mut self) -> impl Iterator<Item = Result<Message, PlayerError>> {
336        std::iter::from_fn(move || match self.next_message() {
337            Ok(Some(msg)) => Some(Ok(msg)),
338            Ok(None) => None,
339            Err(e) => Some(Err(e)),
340        })
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use crate::format::{HddsFormat, HddsWriter, RecordingMetadata};
348    use tempfile::tempdir;
349
350    fn create_test_recording(path: &Path, count: u64) {
351        let metadata = RecordingMetadata::default();
352        let mut writer = HddsWriter::create(path, metadata).expect("create");
353
354        for i in 0..count {
355            let msg = Message {
356                timestamp_nanos: i * 1_000_000, // 1ms apart
357                topic_name: "TestTopic".into(),
358                type_name: "TestType".into(),
359                writer_guid: "01020304050607080910111213141516".into(),
360                sequence_number: i,
361                payload: vec![i as u8],
362                qos_hash: 0,
363            };
364            writer.write_message(&msg).expect("write");
365        }
366
367        writer.finalize().expect("finalize");
368    }
369
370    #[test]
371    fn test_playback_speed_delay() {
372        let realtime = PlaybackSpeed::Realtime;
373        assert_eq!(
374            realtime.delay_for(1_000_000),
375            Some(Duration::from_nanos(1_000_000))
376        );
377
378        let double = PlaybackSpeed::Speed(2.0);
379        assert_eq!(
380            double.delay_for(1_000_000),
381            Some(Duration::from_nanos(500_000))
382        );
383
384        let unlimited = PlaybackSpeed::Unlimited;
385        assert_eq!(unlimited.delay_for(1_000_000), None);
386    }
387
388    #[test]
389    fn test_player_config_builder() {
390        let config = PlayerConfig::new("/tmp/test.hdds")
391            .speed_multiplier(2.0)
392            .loop_playback(true)
393            .start_offset(Duration::from_secs(10));
394
395        assert!(matches!(config.speed, PlaybackSpeed::Speed(s) if (s - 2.0).abs() < 0.001));
396        assert!(config.loop_playback);
397        assert_eq!(config.start_offset_nanos, 10_000_000_000);
398    }
399
400    #[test]
401    fn test_player_open_and_read() {
402        let dir = tempdir().expect("tempdir");
403        let path = dir.path().join("test.hdds");
404
405        create_test_recording(&path, 10);
406
407        let config = PlayerConfig::new(&path).speed(PlaybackSpeed::Unlimited);
408        let mut player = Player::new(config);
409
410        player.open().expect("open");
411        assert_eq!(player.total_messages(), 10);
412
413        let mut count = 0;
414        while let Some(_msg) = player.next_message().expect("next") {
415            count += 1;
416        }
417
418        assert_eq!(count, 10);
419        assert_eq!(player.stats().messages_played, 10);
420    }
421
422    #[test]
423    fn test_player_with_filter() {
424        let dir = tempdir().expect("tempdir");
425        let path = dir.path().join("test.hdds");
426
427        // Create recording with mixed topics
428        {
429            let metadata = RecordingMetadata::default();
430            let mut writer = HddsWriter::create(&path, metadata).expect("create");
431
432            for i in 0..10 {
433                let topic = if i % 2 == 0 { "TopicA" } else { "TopicB" };
434                let msg = Message {
435                    timestamp_nanos: i * 1_000_000,
436                    topic_name: topic.into(),
437                    type_name: "Type".into(),
438                    writer_guid: "guid".into(),
439                    sequence_number: i,
440                    payload: vec![],
441                    qos_hash: 0,
442                };
443                writer.write_message(&msg).expect("write");
444            }
445            writer.finalize().expect("finalize");
446        }
447
448        let config = PlayerConfig::new(&path)
449            .speed(PlaybackSpeed::Unlimited)
450            .topic_filter(TopicFilter::include(vec!["TopicA".into()]));
451
452        let mut player = Player::new(config);
453        player.open().expect("open");
454
455        let mut count = 0;
456        while let Some(_msg) = player.next_message().expect("next") {
457            count += 1;
458        }
459
460        assert_eq!(count, 5); // Only TopicA messages
461        assert_eq!(player.stats().messages_skipped, 5);
462    }
463
464    #[test]
465    fn test_player_cancel() {
466        let dir = tempdir().expect("tempdir");
467        let path = dir.path().join("test.hdds");
468
469        create_test_recording(&path, 100);
470
471        let config = PlayerConfig::new(&path).speed(PlaybackSpeed::Unlimited);
472        let mut player = Player::new(config);
473
474        player.open().expect("open");
475
476        // Read a few messages
477        for _ in 0..5 {
478            player.next_message().expect("next");
479        }
480
481        // Cancel
482        player.cancel();
483
484        // Next should return error
485        assert!(player.next_message().is_err());
486        assert!(player.is_complete());
487    }
488}