use clap::Parser;
use hdds::{Participant, TransportMode};
use hdds_recording::{
filter::TopicFilter,
recorder::{Recorder, RecorderConfig},
rotation::RotationPolicy,
};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{info, warn};
#[derive(Parser, Debug)]
#[command(name = "hdds-record")]
#[command(about = "Record DDS messages to file")]
#[command(version)]
struct Args {
#[arg(short, long, default_value = "0")]
domain: u32,
#[arg(short, long)]
output: PathBuf,
#[arg(short, long)]
topics: Option<String>,
#[arg(long)]
exclude_topics: Option<String>,
#[arg(long)]
types: Option<String>,
#[arg(long)]
description: Option<String>,
#[arg(long)]
rotate_size: Option<u64>,
#[arg(long)]
rotate_duration: Option<u64>,
#[arg(long, default_value = "0")]
max_files: u32,
#[arg(long, default_value = "0")]
duration: u64,
#[arg(long, default_value = "info")]
log_level: String,
#[arg(short, long)]
quiet: bool,
}
struct RecordingReader {
reader: hdds::RawDataReader,
type_name: String,
qos_hash: u32,
}
fn main() -> anyhow::Result<()> {
let args = Args::parse();
let filter = args.log_level.parse().unwrap_or(tracing::Level::INFO);
tracing_subscriber::fmt()
.with_max_level(filter)
.with_target(false)
.init();
let mut config = RecorderConfig::new(&args.output).domain_id(args.domain);
if let Some(pattern) = &args.topics {
let patterns: Vec<String> = pattern.split(',').map(|s| s.trim().to_string()).collect();
config = config.topic_filter(TopicFilter::include(patterns));
} else if let Some(pattern) = &args.exclude_topics {
let patterns: Vec<String> = pattern.split(',').map(|s| s.trim().to_string()).collect();
config = config.topic_filter(TopicFilter::exclude(patterns));
}
if let Some(desc) = &args.description {
config = config.description(desc.clone());
}
if let Some(size_mb) = args.rotate_size {
let policy = RotationPolicy::by_size(size_mb).with_max_files(args.max_files);
config = config.rotation(policy);
} else if let Some(duration) = args.rotate_duration {
let policy = RotationPolicy::by_duration(duration).with_max_files(args.max_files);
config = config.rotation(policy);
}
let mut recorder = Recorder::new(config);
if !args.quiet {
info!("HDDS Recording Service v{}", env!("CARGO_PKG_VERSION"));
info!("Domain: {}", args.domain);
info!("Output: {}", args.output.display());
if let Some(ref topics) = args.topics {
info!("Topics: {}", topics);
}
}
recorder.start()?;
if !args.quiet {
info!("Recording started. Press Ctrl+C to stop.");
}
let running = Arc::new(AtomicBool::new(true));
let r = Arc::clone(&running);
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
})?;
let participant = Participant::builder("hdds-record")
.with_transport(TransportMode::UdpMulticast)
.domain_id(args.domain)
.build()?;
let topic_filter = recorder.config().topic_filter.clone();
let type_filter = recorder.config().type_filter.clone();
let mut readers: HashMap<String, RecordingReader> = HashMap::new();
let mut last_discovery = Instant::now()
.checked_sub(Duration::from_secs(1))
.unwrap_or_else(Instant::now);
let mut last_report = Instant::now();
let start = Instant::now();
let duration_limit = if args.duration > 0 {
Some(std::time::Duration::from_secs(args.duration))
} else {
None
};
while running.load(Ordering::SeqCst) {
if let Some(limit) = duration_limit {
if start.elapsed() >= limit {
info!("Duration limit reached");
break;
}
}
if last_discovery.elapsed() >= Duration::from_secs(1) {
match participant.discover_topics() {
Ok(topics) => {
for info in topics {
if info.publisher_count == 0 {
continue;
}
if let Some(ref filter) = topic_filter {
if !filter.matches(&info.name) {
continue;
}
}
if let Some(ref filter) = type_filter {
if !filter.matches(&info.type_name) {
continue;
}
}
if readers.contains_key(&info.name) {
continue;
}
let reader = match participant.create_raw_reader_with_type(
&info.name,
&info.type_name,
Some(info.qos.clone()),
info.type_object.clone(),
) {
Ok(reader) => reader,
Err(err) => {
warn!("Failed to create reader for {}: {}", info.name, err);
continue;
}
};
readers.insert(
info.name.clone(),
RecordingReader {
reader,
type_name: info.type_name.clone(),
qos_hash: info.qos_hash,
},
);
}
}
Err(err) => {
warn!("DDS discovery failed: {}", err);
}
}
last_discovery = Instant::now();
}
for (topic, entry) in readers.iter() {
match entry.reader.try_take_raw() {
Ok(samples) => {
for sample in samples {
let seq = sample.sequence_number.unwrap_or(0);
let writer_guid = if sample.writer_guid.is_zero() {
"unknown".to_string()
} else {
sample.writer_guid.to_string()
};
recorder.record_sample(
topic,
&entry.type_name,
&writer_guid,
seq,
&sample.payload,
entry.qos_hash,
)?;
}
}
Err(err) => {
warn!("DDS read failed for {}: {}", topic, err);
}
}
}
if !args.quiet && last_report.elapsed() >= Duration::from_secs(10) {
let stats = recorder.stats();
if stats.message_count > 0 {
info!(
"Recorded {} messages ({:.1} MB)",
stats.message_count,
stats.bytes_written as f64 / 1_048_576.0
);
}
last_report = Instant::now();
}
std::thread::sleep(Duration::from_millis(20));
}
let stats = recorder.stop()?;
if !args.quiet {
info!("Recording stopped");
info!(" Messages: {}", stats.message_count);
info!(" Duration: {:.1}s", stats.duration_secs);
info!(" Throughput: {:.1} msg/s", stats.messages_per_second);
info!(" File: {}", args.output.display());
}
Ok(())
}