rustbag 0.1.1

A high-performance ROS 2 bag player
// Copyright 2025 Ivo Ivanov.
// Copyright 2018 Open Source Robotics Foundation, Inc.
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


use atomic_enum::atomic_enum;
use std::sync::{atomic, Arc, Condvar, Mutex};

use std::time::{Duration, Instant};

pub struct Event {
    data: Arc<(Mutex<bool>, Condvar)>,
}

impl Event {
    pub fn new(initial_state: bool) -> Self {
        Self {
            data: Arc::new((Mutex::new(initial_state), Condvar::new())),
        }
    }
    pub fn set(&self, state: bool) {
        let (lock, cvar) = &*self.data;
        let mut started = lock.lock().unwrap();
        *started = state;
        cvar.notify_all();
    }
    pub fn wait(&self) {
        let (lock, cvar) = &*self.data;
        let mut started = lock.lock().unwrap();
        while !*started {
            started = cvar.wait(started).unwrap();
        }
    }
}

pub struct Stopwatch {
    pub(crate) duration: Duration,
    pub(crate) instant: Instant,
    pub(crate) rate: f64,
}

impl Stopwatch {
    pub fn new() -> Self {
        Self {
            duration: Duration::new(0, 0),
            instant: Instant::now(),
            rate: 1.,
        }
    }
    pub fn set_rate(&mut self, new_rate: f64) {
        self.pause();
        self.rate = new_rate;
        self.resume()
    }

    pub fn pause(&mut self) {
        self.duration += self.elapsed_since_last_pause();
    }
    pub fn resume(&mut self) {
        self.instant = Instant::now();
    }

    pub fn reset(&mut self) {
        self.duration = Duration::new(0, 0);
        self.instant = Instant::now();
    }

    pub fn elapsed_since_last_pause(&self) -> Duration {
        Duration::from_secs_f64(self.instant.elapsed().as_secs_f64() * self.rate)
    }
    /// only correct in non-paused state
    pub fn elapsed(&self) -> Duration {
        self.duration + self.elapsed_since_last_pause()
    }
}

#[atomic_enum]
#[derive(PartialEq, Eq)]
pub enum PlaybackState {
    Running = 0,
    Paused = 1,
    StepOnce = 2,
}

pub struct PlayerClock {
    pub(crate) stop_watch: Mutex<Stopwatch>,
    state: AtomicPlaybackState,
    is_running_event: Event,
}

impl PlayerClock {
    pub fn new() -> Self {
        Self {
            stop_watch: Mutex::new(Stopwatch::new()),
            state: AtomicPlaybackState::new(PlaybackState::Running),
            is_running_event: Event::new(true),
        }
    }
    pub fn pause(&self) {
        self.state
            .store(PlaybackState::Paused, atomic::Ordering::SeqCst);
        self.stop_watch.lock().unwrap().pause();
        self.is_running_event.set(false);
    }

    pub fn is_paused(&self) -> bool {
        self.state.load(atomic::Ordering::SeqCst) == PlaybackState::Paused
    }
    pub fn resume(&self) {
        self.state
            .store(PlaybackState::Running, atomic::Ordering::SeqCst);
        self.stop_watch.lock().unwrap().resume();
        self.is_running_event.set(true);
    }

    pub fn wait_if_paused(&self) {
        if self.state.load(atomic::Ordering::SeqCst) == PlaybackState::StepOnce {
            self.pause();
        }
        self.is_running_event.wait();
    }

    pub fn step_once(&self) {
        // transition to state StepOnce only if we are in state pause
        if self
            .state
            .compare_exchange(
                PlaybackState::Paused,
                PlaybackState::StepOnce,
                atomic::Ordering::SeqCst,
                atomic::Ordering::SeqCst,
            )
            .is_ok()
        {
            self.stop_watch.lock().unwrap().resume();
            self.is_running_event.set(true);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration as StdDuration;

    #[test]
    fn clock_moves_forward_when_running() {
        let clk = PlayerClock::new();
        {
            let mut sw = clk.stop_watch.lock().unwrap();
            sw.reset();
        }
        std::thread::sleep(StdDuration::from_millis(50));
        let elapsed = { clk.stop_watch.lock().unwrap().elapsed() };
        assert!(elapsed >= StdDuration::from_millis(40));
    }

    #[test]
    fn clock_does_not_advance_duration_when_paused() {
        let clk = PlayerClock::new();
        {
            let mut sw = clk.stop_watch.lock().unwrap();
            sw.reset();
        }
        clk.pause();
        let dur_before = { clk.stop_watch.lock().unwrap().duration };
        std::thread::sleep(StdDuration::from_millis(50));
        let dur_after = { clk.stop_watch.lock().unwrap().duration };
        assert_eq!(
            dur_before, dur_after,
            "duration should not change while paused"
        );
    }

    #[test]
    fn step_once_ignored_when_not_paused() {
        let clk = PlayerClock::new();
        // Ensure we are running
        clk.resume();
        // Call step_once; should not change state when not paused
        clk.step_once();
        let st = clk.state.load(atomic::Ordering::SeqCst);
        assert_eq!(st, PlaybackState::Running);
    }
}