use std::collections::HashSet;
use std::path::PathBuf;
use std::time::Duration;
use core_types::Timestamp;
use replay_core::{FilePacketRecorder, FileReplaySession, RecordedEntry, ReplaySession};
use crate::constants::DEFAULT_BAG_PATH;
use crate::demo::make_demo_packet;
use crate::helpers::{
entry_to_json, first_positional, has_flag, option_value, parse_domain_option,
parse_replay_speed, parse_usize_option, replay_speed_label, topic_matches,
};
pub fn topic_hz(args: &[String]) -> Result<(), String> {
let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
let input = option_value(args, "--input")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
let json = has_flag(args, "--json");
let replay_file = FileReplaySession::open(&input)
.map_err(|err| format!("open bag file {} failed: {err}", input.display()))?;
let entries = replay_file
.entries()
.iter()
.filter(|entry| topic_matches(&entry.topic, &topic_name))
.collect::<Vec<_>>();
let observed_entries = entries.len();
let capture_span_ns = match (entries.first(), entries.last()) {
(Some(first), Some(last)) => last.captured_at.0.saturating_sub(first.captured_at.0) as u64,
_ => 0,
};
let hz = if observed_entries < 2 || capture_span_ns == 0 {
0.0
} else {
let seconds = capture_span_ns as f64 / 1_000_000_000.0;
(observed_entries as f64 - 1.0) / seconds
};
if json {
let payload = serde_json::json!({
"topic": topic_name,
"input": input,
"observed_entries": observed_entries,
"capture_span_ns": capture_span_ns,
"hz": hz,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize topic hz failed: {err}"))?
);
} else {
println!("RobotRT Topic Hz");
println!("topic: {}", topic_name);
println!("input: {}", input.display());
println!("observed_entries: {}", observed_entries);
println!("capture_span_ns: {}", capture_span_ns);
println!("hz: {:.3}", hz);
}
Ok(())
}
pub fn topic_echo(args: &[String]) -> Result<(), String> {
let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
let input = option_value(args, "--input")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
let limit = parse_usize_option(args, "--limit", 5)?;
let json = has_flag(args, "--json");
let replay_file = FileReplaySession::open(&input)
.map_err(|err| format!("open bag file {} failed: {err}", input.display()))?;
let rows = replay_file
.entries()
.iter()
.filter(|entry| topic_matches(&entry.topic, &topic_name))
.take(limit)
.map(entry_to_json)
.collect::<Vec<_>>();
if json {
let payload = serde_json::json!({
"topic": topic_name,
"input": input,
"limit": limit,
"entries": rows,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize topic echo failed: {err}"))?
);
} else {
println!("RobotRT Topic Echo");
println!("topic: {}", topic_name);
println!("input: {}", input.display());
for row in rows {
println!("{}", row);
}
}
Ok(())
}
pub fn bag_record(args: &[String]) -> Result<(), String> {
let output = option_value(args, "--output")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
let count = parse_usize_option(args, "--count", 20)?;
let topic = option_value(args, "--topic").unwrap_or_else(|| String::from("/demo/bag"));
let domain = parse_domain_option(option_value(args, "--domain"))?;
if let Some(parent) = output.parent() {
std::fs::create_dir_all(parent)
.map_err(|err| format!("create output directory {} failed: {err}", parent.display()))?;
}
let mut recorder = FilePacketRecorder::create(&output)
.map_err(|err| format!("create bag file {} failed: {err}", output.display()))?;
for seq in 0..count {
let packet = make_demo_packet(seq as u64, domain);
let entry = RecordedEntry::new(Timestamp::now(), topic.clone(), packet);
recorder
.append(&entry)
.map_err(|err| format!("append bag entry failed: {err}"))?;
}
recorder
.flush()
.map_err(|err| format!("flush bag file {} failed: {err}", output.display()))?;
println!("bag recorded: {}", output.display());
println!("entries: {}", count);
println!("topic: {}", topic);
println!("domain: {:?}", domain);
Ok(())
}
pub fn bag_play(args: &[String]) -> Result<(), String> {
let input = option_value(args, "--input")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
let speed = parse_replay_speed(option_value(args, "--speed"))?;
let json = has_flag(args, "--json");
let replay_file = FileReplaySession::open(&input)
.map_err(|err| format!("open bag file {} failed: {err}", input.display()))?;
let entries = replay_file.entries();
let mut session = ReplaySession::new(entries, speed.clone());
let started_at = std::time::Instant::now();
let mut emitted = 0usize;
while !session.is_done() {
let count = session.pump(|_entry| {});
emitted += count;
if count == 0 {
std::thread::sleep(Duration::from_millis(1));
}
}
let elapsed_ns = started_at.elapsed().as_nanos() as u64;
let topic_count = entries
.iter()
.map(|entry| entry.topic.as_str())
.collect::<HashSet<_>>()
.len();
let capture_span_ns = match (entries.first(), entries.last()) {
(Some(first), Some(last)) => {
(last.captured_at.0.saturating_sub(first.captured_at.0)) as u64
}
_ => 0,
};
if json {
let payload = serde_json::json!({
"input": input,
"speed": replay_speed_label(&speed),
"total_entries": entries.len(),
"emitted_entries": emitted,
"topic_count": topic_count,
"capture_span_ns": capture_span_ns,
"replay_elapsed_ns": elapsed_ns,
});
let out = serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize bag replay summary failed: {err}"))?;
println!("{out}");
} else {
println!("RobotRT Bag Replay");
println!("input: {}", input.display());
println!("speed: {}", replay_speed_label(&speed));
println!("total_entries: {}", entries.len());
println!("emitted_entries: {}", emitted);
println!("topic_count: {}", topic_count);
println!("capture_span_ns: {}", capture_span_ns);
println!("replay_elapsed_ns: {}", elapsed_ns);
}
Ok(())
}