use crate::args::Args;
use crate::clock::PlayerClock;
use crate::message_queue::{BagIndex, MessageQueue, MessageQueueOptions};
use crate::qos::read_qos_for_publishers;
use anyhow::{bail, Context, Result};
use crossterm::event::{self, Event as CEvent, KeyCode};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use mcap::read::{RawMessage, Summary};
use memmap2::Mmap;
use nix::sys::termios::{self, LocalFlags, SetArg};
use r2r::builtin_interfaces::msg::Time;
use r2r::rosgraph_msgs::msg::Clock;
use r2r::{Node, Publisher, PublisherUntyped, QosProfile};
use regex::Regex;
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread;
use std::time::Duration;
pub type ChannelId = u16;
pub type PublisherMap = Vec<HashMap<ChannelId, PublisherUntyped>>;
pub struct TopicsHandler {
is_publishing_clock: bool,
topics_to_include: HashSet<String>,
topics_to_exclude: HashSet<String>,
regex_to_include: Option<Regex>,
regex_to_exclude: Option<Regex>,
topics_remap: HashMap<String, String>,
}
impl TopicsHandler {
pub fn new(args: &Args) -> Result<Self> {
let regex_to_include = args
.regex
.as_deref()
.map(Regex::new)
.transpose()
.context("invalid include regex")?;
let regex_to_exclude = args
.exclude_regex
.as_deref()
.map(Regex::new)
.transpose()
.context("invalid exclude regex")?;
Ok(Self {
is_publishing_clock: args.clock,
topics_to_include: args.topics.clone().into_iter().collect(),
topics_to_exclude: args.exclude_topics.clone().into_iter().collect(),
regex_to_include,
regex_to_exclude,
topics_remap: args
.remap
.iter()
.map(|r| (r.0.clone(), r.1.clone()))
.collect(),
})
}
pub fn resolve_topic(&self, topic_name: &str) -> Option<String> {
if (!self.topics_to_include.is_empty() && !self.topics_to_include.contains(topic_name))
|| (self.is_publishing_clock && topic_name == "/clock")
|| !self
.regex_to_include
.as_ref()
.map(|r| r.is_match(topic_name))
.unwrap_or(true)
|| self
.regex_to_exclude
.as_ref()
.map(|r| r.is_match(topic_name))
.unwrap_or(false)
|| self.topics_to_exclude.contains(topic_name)
{
return None;
}
Some(
self.topics_remap
.get(topic_name)
.unwrap_or(&topic_name.to_string())
.clone(),
)
}
}
pub struct PublishHandler {
publishers: PublisherMap,
maybe_clock_publisher: Option<Publisher<Clock>>,
last_time_clock_was_published: Option<std::time::Duration>,
}
impl PublishHandler {
pub fn new(publishers: PublisherMap, maybe_clock_publisher: Option<Publisher<Clock>>) -> Self {
Self {
publishers,
maybe_clock_publisher,
last_time_clock_was_published: None,
}
}
pub fn channels_to_publish(&self) -> Vec<HashSet<u16>> {
self.publishers
.iter()
.map(|hm| hm.keys().cloned().collect())
.collect()
}
fn publish<'a>(
&mut self,
message_for_bag: (usize, RawMessage<'a>),
clock_period: Duration,
) -> Result<()> {
let (i_bag, message) = message_for_bag;
let pub_time = std::time::Duration::from_nanos(message.header.publish_time);
let channel_id = message.header.channel_id;
let p = self.publishers[i_bag]
.get(&channel_id)
.context("publisher not found")?;
if let Err(e) = p.publish_raw(&message.data) {
log::warn!("Failed to publish on channel {}: {}", channel_id, e);
}
if let Some(clock_publisher) = &self.maybe_clock_publisher {
let should_publish_clock = match self.last_time_clock_was_published {
None => true,
Some(last) => pub_time.saturating_sub(last) >= clock_period,
};
if should_publish_clock {
if let Err(e) = clock_publisher.publish(&Clock {
clock: Time {
sec: pub_time.as_secs() as i32,
nanosec: pub_time.subsec_nanos(),
},
}) {
log::warn!("Failed to publish /clock: {}", e);
} else {
self.last_time_clock_was_published = Some(pub_time);
}
}
}
Ok(())
}
}
struct TerminalRawMode(termios::Termios);
impl TerminalRawMode {
pub fn new() -> Result<TerminalRawMode> {
let orig = termios::tcgetattr(std::io::stdin()).context("tcgetattr failed")?;
let mut raw = orig.clone();
termios::cfmakeraw(&mut raw);
raw.output_flags = orig.output_flags;
raw.local_flags.insert(LocalFlags::ISIG);
termios::tcsetattr(std::io::stdin(), SetArg::TCSADRAIN, &raw)
.context("tcsetattr raw failed")?;
Ok(TerminalRawMode(orig))
}
}
impl Drop for TerminalRawMode {
fn drop(&mut self) {
let _ = termios::tcsetattr(std::io::stdin(), SetArg::TCSANOW, &self.0);
}
}
pub struct Player {
args: Args,
pub(crate) player_clock: PlayerClock,
bags_maps: Vec<Mmap>,
bag_time: Option<(u64, u64)>,
progress_bar: Option<ProgressBar>,
}
impl Player {
pub fn new(args: Args) -> Self {
Self {
args,
player_clock: PlayerClock::new(),
bags_maps: Vec::new(),
bag_time: None,
progress_bar: None,
}
}
pub fn play(&mut self, node: &mut Node) -> Result<()> {
let mut publish_handler = self.initialize(node)?;
let channels_to_publish = publish_handler.channels_to_publish();
let is_running = atomic::AtomicBool::new(true);
self.initialize_progressbar();
thread::scope(|s| {
s.spawn(|| {
self.handle_keyboard_events_loop(&is_running).unwrap();
});
loop {
let (tx, rx) = sync_channel(100);
let read_msg_thread = s.spawn(|| {
if let Err(e) = self.read_messages_loop(tx, &channels_to_publish) {
log::error!("read_messages_loop error: {}", e);
}
});
self.publish_loop(rx, &mut publish_handler)
.expect("publish loop failed");
let _ = read_msg_thread.join();
if !self.args.r#loop {
break;
}
log::info!("Restarting playback ..");
}
log::info!("Finished playing.");
is_running.store(false, atomic::Ordering::SeqCst);
});
Ok(())
}
pub fn open_rosbag(bag_file: &str) -> Result<Mmap> {
let p = Path::new(bag_file);
let resolved = if p.is_dir() {
Self::find_mcap_in_dir(p)?
} else {
p.to_path_buf()
};
let resolved = Self::ensure_decompressed_if_needed(&resolved)?;
log::info!("Reading bag file: {} ...", resolved.display());
Self::map_mcap_path(&resolved)
}
fn find_mcap_in_dir(dir: &Path) -> Result<PathBuf> {
let mut mcap: Option<PathBuf> = None;
let mut zstd: Option<PathBuf> = None;
for entry in
std::fs::read_dir(dir).with_context(|| format!("read_dir {}", dir.display()))?
{
let entry = entry?;
let path = entry.path();
if path.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if name == "metadata.yaml" {
continue;
}
}
match path.extension().and_then(|e| e.to_str()) {
Some("mcap") => {
if mcap.is_none() {
mcap = Some(path);
}
}
Some("zstd") => {
if path
.file_name()
.and_then(|s| s.to_str())
.is_some_and(|n| n.ends_with(".mcap.zstd"))
&& zstd.is_none()
{
zstd = Some(path);
}
}
_ => {}
}
}
}
if let Some(p) = mcap {
return Ok(p);
}
if let Some(p) = zstd {
return Ok(p);
}
bail!("No .mcap or .mcap.zstd found in {}", dir.display())
}
fn ensure_decompressed_if_needed(path: &Path) -> Result<PathBuf> {
let fname = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
if fname.ends_with(".mcap.zstd") {
let dest = Self::dest_mcap_path(path);
if dest.exists() {
log::info!("Using cached decompressed MCAP: {}", dest.display());
return Ok(dest);
}
log::info!("Decompressing {} to {} ...", path.display(), dest.display());
Self::decompress_zstd_to(path, &dest)?;
Ok(dest)
} else {
Ok(path.to_path_buf())
}
}
fn dest_mcap_path(zstd_path: &Path) -> PathBuf {
let mut dest = zstd_path.to_path_buf();
let stem = zstd_path.file_stem().unwrap_or_default().to_os_string();
dest.set_file_name(stem);
dest
}
fn decompress_zstd_to(src: &Path, dest: &Path) -> Result<()> {
let mut in_f =
std::fs::File::open(src).with_context(|| format!("open {}", src.display()))?;
let mut dec = zstd::stream::read::Decoder::new(&mut in_f).context("zstd decoder init")?;
let mut tmp = dest.to_path_buf();
tmp.set_extension("mcap.part");
let mut out_f =
std::fs::File::create(&tmp).with_context(|| format!("create {}", tmp.display()))?;
let mut buf = [0u8; 128 * 1024];
loop {
let n = dec.read(&mut buf)?;
if n == 0 {
break;
}
use std::io::Write;
out_f.write_all(&buf[..n])?;
}
use std::io::Write;
out_f.flush()?;
std::fs::rename(&tmp, dest)
.with_context(|| format!("rename {} -> {}", tmp.display(), dest.display()))?;
Ok(())
}
fn map_mcap_path(p: &Path) -> Result<Mmap> {
let fd = std::fs::File::open(p).with_context(|| format!("open {}", p.display()))?;
unsafe { Mmap::map(&fd) }.context("Couldn't map MCAP file")
}
pub fn choose_qos(topic_name: &str) -> QosProfile {
if topic_name == "/tf_static" {
QosProfile::default().transient_local()
} else {
QosProfile::default()
}
}
pub fn initialize(&mut self, node: &mut Node) -> Result<PublishHandler> {
let mut summaries: Vec<Summary> = Vec::new();
for filename in self.args.input.iter() {
let file_buffer = Self::open_rosbag(filename)?;
let summary_opt = Summary::read(&file_buffer).context("read summary")?;
let Some(summary) = summary_opt else {
bail!("MCAP missing header: {}", filename)
};
let start_time = summary
.stats
.as_ref()
.context("missing stats")?
.message_start_time;
let end_time = summary
.stats
.as_ref()
.context("missing stats")?
.message_end_time;
if self.bag_time.is_none() {
self.bag_time = Some((start_time, end_time));
} else if let Some((beg, end)) = self.bag_time {
self.bag_time = Some((beg.min(start_time), end.max(end_time)));
}
summaries.push(summary);
self.bags_maps.push(file_buffer);
}
self.create_publishers(node, &summaries)
}
pub fn create_publishers(
&mut self,
node: &mut Node,
bag_summaries: &[Summary],
) -> Result<PublishHandler> {
let mut publishers: PublisherMap = vec![HashMap::new(); self.args.input.len()];
let topics_handler = TopicsHandler::new(&self.args)?;
let mut available_topics: HashSet<String> = HashSet::new();
for (i_bag, summary) in bag_summaries.iter().enumerate() {
let qos_for_publishers = read_qos_for_publishers(&self.bags_maps[i_bag], summary);
for (channel_id, channel) in &summary.channels {
let topic_name = &channel.topic;
let message_type = &channel.schema.as_ref().context("missing schema name")?.name;
available_topics.insert(topic_name.clone());
if let Some(target_topic_name) = topics_handler.resolve_topic(topic_name) {
log::info!("Setting up publisher for topic '{}'", target_topic_name);
match node.create_publisher_untyped(
target_topic_name.as_str(),
message_type.as_str(),
qos_for_publishers
.get(topic_name)
.cloned()
.unwrap_or_else(|| Self::choose_qos(topic_name)),
) {
Ok(pub_untyped) => {
publishers[i_bag].insert(*channel_id, pub_untyped);
}
Err(e) => {
log::warn!(
"Topic '{}' will not be published: {}",
target_topic_name,
e
);
}
}
}
}
}
for topic in &self.args.topics {
if !available_topics.contains(topic) {
log::warn!("Requested topic not found in bag: {topic}",);
}
}
let maybe_clock_pub = if self.args.clock {
Some(
node.create_publisher::<Clock>("/clock", QosProfile::default())
.context("create /clock publisher")?,
)
} else {
None
};
Ok(PublishHandler::new(publishers, maybe_clock_pub))
}
fn initialize_progressbar(&mut self) {
let bag_time = self.bag_time.unwrap();
self.progress_bar = Some(ProgressBar::new(bag_time.1 - bag_time.0));
if let Ok(style) = ProgressStyle::with_template(
"{spinner:.green} [{wide_bar:.cyan/blue}] {pos_time}/{total_time} (-{remaining})",
) {
let style = style
.with_key("pos_time", |state: &ProgressState, w: &mut dyn Write| {
let secs = (state.pos() as f64) / 1e9;
let _ = write!(w, "{:.1}s", secs);
})
.with_key("total_time", |state: &ProgressState, w: &mut dyn Write| {
let len = state.len().unwrap_or(0);
let secs = (len as f64) / 1e9;
let _ = write!(w, "{:.1}s", secs);
})
.with_key("remaining", |state: &ProgressState, w: &mut dyn Write| {
let len = state.len().unwrap_or(0);
let pos = state.pos();
let rem = len.saturating_sub(pos);
let secs = (rem as f64) / 1e9;
let _ = write!(w, "{:.1}s", secs);
})
.progress_chars("#>-");
self.progress_bar.as_ref().unwrap().set_style(style);
} else {
log::warn!("Failed to configure progress bar style");
}
}
pub fn publish_loop(
&self,
message_receiver: Receiver<(BagIndex, RawMessage<'_>)>,
publish_handler: &mut PublishHandler,
) -> Result<()> {
let mut earliest_start_time_nano: Option<std::time::Duration> = None;
let clock_period: Duration =
std::time::Duration::from_secs_f64(1.0 / self.args.clock_rate_hz);
let bag_start_time = self.bag_time.unwrap().0;
while let Ok((i_bag, message)) = message_receiver.recv() {
self.player_clock.wait_if_paused();
let pub_time = std::time::Duration::from_nanos(message.header.publish_time);
if earliest_start_time_nano.is_none() {
self.progress_bar.as_ref().unwrap().suspend(|| {
log::info!("Starting playing ...");
});
earliest_start_time_nano = Some(pub_time);
self.player_clock.stop_watch.lock().unwrap().reset();
}
let base = earliest_start_time_nano.unwrap_or(pub_time);
let msg_time_since_first_msg = match pub_time.checked_sub(base) {
Some(d) => d,
None => std::time::Duration::ZERO,
};
let system_time_since_play_start =
self.player_clock.stop_watch.lock().unwrap().elapsed();
if msg_time_since_first_msg < system_time_since_play_start {
let delay_ms =
(system_time_since_play_start - msg_time_since_first_msg).as_secs_f64() * 1000.;
if delay_ms > 50. {
self.progress_bar.as_ref().unwrap().suspend(|| {
log::warn!(
"Message delay > 50ms occurred ({:.1} ms), the player can't keep up, please reduce the playback rate",
delay_ms
);
});
}
} else {
thread::sleep(msg_time_since_first_msg - system_time_since_play_start);
}
self.progress_bar
.as_ref()
.unwrap()
.set_position(message.header.publish_time - bag_start_time);
publish_handler.publish((i_bag, message), clock_period)?;
}
self.progress_bar.as_ref().unwrap().finish();
Ok(())
}
pub fn read_messages_loop<'a>(
&'a self,
messages_sender: SyncSender<(BagIndex, RawMessage<'a>)>,
channels_to_publish: &[HashSet<u16>],
) -> Result<()> {
let lookahead = Duration::from_secs_f64(self.args.lookahead);
let (bag_start, _) = self
.bag_time
.ok_or_else(|| anyhow::anyhow!("Bag time not initialized"))?;
let mut message_queue = MessageQueue::new(
&self.bags_maps,
MessageQueueOptions {
start_offset: Duration::from_secs_f64(self.args.start_offset.unwrap_or(0.0)),
bag_start_time: Duration::from_nanos(bag_start),
},
)?;
log::info!("Filling buffer ...");
while let Some((bag_index, message)) =
message_queue.get_next(&lookahead, channels_to_publish)
{
if let Err(e) = messages_sender.send((bag_index, message)) {
log::warn!("Receiver disconnected: {}", e);
break;
}
}
drop(messages_sender);
Ok(())
}
fn toggle_pause(&self) {
if self.player_clock.is_paused() {
self.progress_bar.as_ref().unwrap().suspend(|| {
log::info!("Resuming play");
});
self.player_clock.resume();
} else {
self.progress_bar.as_ref().unwrap().suspend(|| {
log::info!("Pausing play");
});
self.player_clock.pause();
}
}
pub fn handle_keyboard_events_loop(&self, is_running: &AtomicBool) -> Result<()> {
self.player_clock
.stop_watch
.lock()
.unwrap()
.set_rate(self.args.rate.unwrap_or(1.));
log::info!("Press SPACE for Pause/Resume");
log::info!("Press CURSOR_RIGHT for Playing Next Message");
log::info!("Press CURSOR_UP to Increase Rate 10%");
log::info!("Press CURSOR_DOWN to Decrease Rate 10%");
let _guard = TerminalRawMode::new()?;
while is_running.load(atomic::Ordering::SeqCst) {
if event::poll(Duration::from_millis(100)).unwrap_or(false) {
if let Ok(CEvent::Key(key_event)) = event::read() {
if key_event.code == KeyCode::Char(' ') {
self.toggle_pause();
} else if key_event.code == KeyCode::Down || key_event.code == KeyCode::Up {
let mut timer = self.player_clock.stop_watch.lock().unwrap();
let current_rate = timer.rate;
if key_event.code == KeyCode::Down {
timer.set_rate((current_rate - 0.1).max(0.1))
} else {
timer.set_rate(current_rate + 0.1)
};
self.progress_bar.as_ref().unwrap().suspend(|| {
log::info!("Set rate to {:.1}", timer.rate);
});
} else if key_event.code == KeyCode::Right {
self.progress_bar.as_ref().unwrap().suspend(|| {
log::info!("Stepping one message ahead");
});
self.player_clock.step_once();
}
}
}
}
Ok(())
}
}