1use crate::filter::TopicFilter;
9use crate::format::{HddsReader, Message};
10use std::path::{Path, PathBuf};
11use std::time::{Duration, Instant};
12use thiserror::Error;
13
14#[derive(Debug, Clone, Copy, Default)]
16pub enum PlaybackSpeed {
17 #[default]
19 Realtime,
20 Speed(f64),
22 Unlimited,
24}
25
26impl PlaybackSpeed {
27 pub fn multiplier(&self) -> f64 {
29 match self {
30 Self::Realtime => 1.0,
31 Self::Speed(s) => *s,
32 Self::Unlimited => f64::INFINITY,
33 }
34 }
35
36 pub fn delay_for(&self, delta_nanos: u64) -> Option<Duration> {
38 match self {
39 Self::Unlimited => None,
40 Self::Realtime => Some(Duration::from_nanos(delta_nanos)),
41 Self::Speed(s) => {
42 if *s <= 0.0 {
43 None
44 } else {
45 Some(Duration::from_nanos((delta_nanos as f64 / s) as u64))
46 }
47 }
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct PlayerConfig {
55 pub input_path: PathBuf,
57
58 pub speed: PlaybackSpeed,
60
61 pub topic_filter: Option<TopicFilter>,
63
64 pub loop_playback: bool,
66
67 pub start_offset_nanos: u64,
69
70 pub end_time_nanos: u64,
72}
73
74impl PlayerConfig {
75 pub fn new<P: AsRef<Path>>(input_path: P) -> Self {
77 Self {
78 input_path: input_path.as_ref().to_path_buf(),
79 speed: PlaybackSpeed::Realtime,
80 topic_filter: None,
81 loop_playback: false,
82 start_offset_nanos: 0,
83 end_time_nanos: 0,
84 }
85 }
86
87 pub fn speed(mut self, speed: PlaybackSpeed) -> Self {
89 self.speed = speed;
90 self
91 }
92
93 pub fn speed_multiplier(mut self, multiplier: f64) -> Self {
95 self.speed = if multiplier <= 0.0 {
96 PlaybackSpeed::Unlimited
97 } else if (multiplier - 1.0).abs() < 0.001 {
98 PlaybackSpeed::Realtime
99 } else {
100 PlaybackSpeed::Speed(multiplier)
101 };
102 self
103 }
104
105 pub fn topic_filter(mut self, filter: TopicFilter) -> Self {
107 self.topic_filter = Some(filter);
108 self
109 }
110
111 pub fn loop_playback(mut self, enable: bool) -> Self {
113 self.loop_playback = enable;
114 self
115 }
116
117 pub fn start_offset(mut self, offset: Duration) -> Self {
119 self.start_offset_nanos = offset.as_nanos() as u64;
120 self
121 }
122
123 pub fn end_time(mut self, end: Duration) -> Self {
125 self.end_time_nanos = end.as_nanos() as u64;
126 self
127 }
128}
129
130#[derive(Debug, Error)]
132pub enum PlayerError {
133 #[error("I/O error: {0}")]
134 Io(#[from] std::io::Error),
135
136 #[error("Format error: {0}")]
137 Format(#[from] crate::format::FormatError),
138
139 #[error("File not found: {0}")]
140 FileNotFound(PathBuf),
141
142 #[error("Playback cancelled")]
143 Cancelled,
144}
145
146#[derive(Debug, Clone, Default)]
148pub struct PlaybackStats {
149 pub messages_played: u64,
151
152 pub messages_skipped: u64,
154
155 pub duration_secs: f64,
157
158 pub messages_per_second: f64,
160
161 pub recording_duration_secs: f64,
163
164 pub loops_completed: u32,
166}
167
168pub struct Player {
170 config: PlayerConfig,
171 reader: Option<HddsReader>,
172 last_timestamp: u64,
173 playback_start: Option<Instant>,
174 stats: PlaybackStats,
175 cancelled: bool,
176}
177
178impl Player {
179 pub fn new(config: PlayerConfig) -> Self {
181 Self {
182 config,
183 reader: None,
184 last_timestamp: 0,
185 playback_start: None,
186 stats: PlaybackStats::default(),
187 cancelled: false,
188 }
189 }
190
191 pub fn open(&mut self) -> Result<(), PlayerError> {
193 if !self.config.input_path.exists() {
194 return Err(PlayerError::FileNotFound(self.config.input_path.clone()));
195 }
196
197 let reader = HddsReader::open(&self.config.input_path)?;
198
199 self.stats.recording_duration_secs = reader.duration_nanos() as f64 / 1_000_000_000.0;
200 self.reader = Some(reader);
201 self.last_timestamp = 0;
202 self.playback_start = Some(Instant::now());
203
204 tracing::info!(
205 "Opened {} ({} messages, {:.1}s)",
206 self.config.input_path.display(),
207 self.reader.as_ref().map(|r| r.message_count()).unwrap_or(0),
208 self.stats.recording_duration_secs
209 );
210
211 Ok(())
212 }
213
214 pub fn next_message(&mut self) -> Result<Option<Message>, PlayerError> {
219 if self.cancelled {
220 return Err(PlayerError::Cancelled);
221 }
222
223 loop {
224 let reader = match &mut self.reader {
225 Some(r) => r,
226 None => return Ok(None),
227 };
228
229 match reader.read_message()? {
230 Some(msg) => {
231 if msg.timestamp_nanos < self.config.start_offset_nanos {
233 self.stats.messages_skipped += 1;
234 continue;
235 }
236
237 if self.config.end_time_nanos > 0
238 && msg.timestamp_nanos > self.config.end_time_nanos
239 {
240 if self.config.loop_playback {
242 self.restart()?;
243 continue;
244 }
245 return Ok(None);
246 }
247
248 if let Some(ref filter) = self.config.topic_filter {
250 if !filter.matches(&msg.topic_name) {
251 self.stats.messages_skipped += 1;
252 continue;
253 }
254 }
255
256 if msg.timestamp_nanos > self.last_timestamp {
258 let delta = msg.timestamp_nanos - self.last_timestamp;
259 if let Some(delay) = self.config.speed.delay_for(delta) {
260 std::thread::sleep(delay);
261 }
262 }
263
264 self.last_timestamp = msg.timestamp_nanos;
265 self.stats.messages_played += 1;
266
267 return Ok(Some(msg));
268 }
269 None => {
270 if self.config.loop_playback {
272 self.restart()?;
273 continue;
274 }
275
276 if let Some(start) = self.playback_start {
278 self.stats.duration_secs = start.elapsed().as_secs_f64();
279 if self.stats.duration_secs > 0.0 {
280 self.stats.messages_per_second =
281 self.stats.messages_played as f64 / self.stats.duration_secs;
282 }
283 }
284
285 return Ok(None);
286 }
287 }
288 }
289 }
290
291 fn restart(&mut self) -> Result<(), PlayerError> {
293 self.reader = None;
294 let reader = HddsReader::open(&self.config.input_path)?;
295 self.reader = Some(reader);
296 self.last_timestamp = 0;
297 self.stats.loops_completed += 1;
298
299 tracing::debug!("Restarting playback (loop {})", self.stats.loops_completed);
300
301 Ok(())
302 }
303
304 pub fn cancel(&mut self) {
306 self.cancelled = true;
307 }
308
309 pub fn is_complete(&self) -> bool {
311 self.reader.is_none() || self.cancelled
312 }
313
314 pub fn stats(&self) -> &PlaybackStats {
316 &self.stats
317 }
318
319 pub fn metadata(&self) -> Option<&crate::format::RecordingMetadata> {
321 self.reader.as_ref().map(|r| r.metadata())
322 }
323
324 pub fn total_messages(&self) -> u64 {
326 self.reader.as_ref().map(|r| r.message_count()).unwrap_or(0)
327 }
328
329 pub fn config(&self) -> &PlayerConfig {
331 &self.config
332 }
333
334 pub fn messages(mut self) -> impl Iterator<Item = Result<Message, PlayerError>> {
336 std::iter::from_fn(move || match self.next_message() {
337 Ok(Some(msg)) => Some(Ok(msg)),
338 Ok(None) => None,
339 Err(e) => Some(Err(e)),
340 })
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::format::{HddsFormat, HddsWriter, RecordingMetadata};
348 use tempfile::tempdir;
349
350 fn create_test_recording(path: &Path, count: u64) {
351 let metadata = RecordingMetadata::default();
352 let mut writer = HddsWriter::create(path, metadata).expect("create");
353
354 for i in 0..count {
355 let msg = Message {
356 timestamp_nanos: i * 1_000_000, topic_name: "TestTopic".into(),
358 type_name: "TestType".into(),
359 writer_guid: "01020304050607080910111213141516".into(),
360 sequence_number: i,
361 payload: vec![i as u8],
362 qos_hash: 0,
363 };
364 writer.write_message(&msg).expect("write");
365 }
366
367 writer.finalize().expect("finalize");
368 }
369
370 #[test]
371 fn test_playback_speed_delay() {
372 let realtime = PlaybackSpeed::Realtime;
373 assert_eq!(
374 realtime.delay_for(1_000_000),
375 Some(Duration::from_nanos(1_000_000))
376 );
377
378 let double = PlaybackSpeed::Speed(2.0);
379 assert_eq!(
380 double.delay_for(1_000_000),
381 Some(Duration::from_nanos(500_000))
382 );
383
384 let unlimited = PlaybackSpeed::Unlimited;
385 assert_eq!(unlimited.delay_for(1_000_000), None);
386 }
387
388 #[test]
389 fn test_player_config_builder() {
390 let config = PlayerConfig::new("/tmp/test.hdds")
391 .speed_multiplier(2.0)
392 .loop_playback(true)
393 .start_offset(Duration::from_secs(10));
394
395 assert!(matches!(config.speed, PlaybackSpeed::Speed(s) if (s - 2.0).abs() < 0.001));
396 assert!(config.loop_playback);
397 assert_eq!(config.start_offset_nanos, 10_000_000_000);
398 }
399
400 #[test]
401 fn test_player_open_and_read() {
402 let dir = tempdir().expect("tempdir");
403 let path = dir.path().join("test.hdds");
404
405 create_test_recording(&path, 10);
406
407 let config = PlayerConfig::new(&path).speed(PlaybackSpeed::Unlimited);
408 let mut player = Player::new(config);
409
410 player.open().expect("open");
411 assert_eq!(player.total_messages(), 10);
412
413 let mut count = 0;
414 while let Some(_msg) = player.next_message().expect("next") {
415 count += 1;
416 }
417
418 assert_eq!(count, 10);
419 assert_eq!(player.stats().messages_played, 10);
420 }
421
422 #[test]
423 fn test_player_with_filter() {
424 let dir = tempdir().expect("tempdir");
425 let path = dir.path().join("test.hdds");
426
427 {
429 let metadata = RecordingMetadata::default();
430 let mut writer = HddsWriter::create(&path, metadata).expect("create");
431
432 for i in 0..10 {
433 let topic = if i % 2 == 0 { "TopicA" } else { "TopicB" };
434 let msg = Message {
435 timestamp_nanos: i * 1_000_000,
436 topic_name: topic.into(),
437 type_name: "Type".into(),
438 writer_guid: "guid".into(),
439 sequence_number: i,
440 payload: vec![],
441 qos_hash: 0,
442 };
443 writer.write_message(&msg).expect("write");
444 }
445 writer.finalize().expect("finalize");
446 }
447
448 let config = PlayerConfig::new(&path)
449 .speed(PlaybackSpeed::Unlimited)
450 .topic_filter(TopicFilter::include(vec!["TopicA".into()]));
451
452 let mut player = Player::new(config);
453 player.open().expect("open");
454
455 let mut count = 0;
456 while let Some(_msg) = player.next_message().expect("next") {
457 count += 1;
458 }
459
460 assert_eq!(count, 5); assert_eq!(player.stats().messages_skipped, 5);
462 }
463
464 #[test]
465 fn test_player_cancel() {
466 let dir = tempdir().expect("tempdir");
467 let path = dir.path().join("test.hdds");
468
469 create_test_recording(&path, 100);
470
471 let config = PlayerConfig::new(&path).speed(PlaybackSpeed::Unlimited);
472 let mut player = Player::new(config);
473
474 player.open().expect("open");
475
476 for _ in 0..5 {
478 player.next_message().expect("next");
479 }
480
481 player.cancel();
483
484 assert!(player.next_message().is_err());
486 assert!(player.is_complete());
487 }
488}