robotrt-cli 0.1.0-beta.1

RobotRT modular robotics runtime and middleware components.
use std::collections::HashSet;
use std::path::PathBuf;
use std::time::Duration;

use core_types::Timestamp;
use replay_core::{FilePacketRecorder, FileReplaySession, RecordedEntry, ReplaySession};

use crate::constants::DEFAULT_BAG_PATH;
use crate::demo::make_demo_packet;
use crate::helpers::{
    entry_to_json, first_positional, has_flag, option_value, parse_domain_option,
    parse_replay_speed, parse_usize_option, replay_speed_label, topic_matches,
};

pub fn topic_hz(args: &[String]) -> Result<(), String> {
    let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
    let input = option_value(args, "--input")
        .map(PathBuf::from)
        .unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
    let json = has_flag(args, "--json");

    let replay_file = FileReplaySession::open(&input)
        .map_err(|err| format!("open bag file {} failed: {err}", input.display()))?;
    let entries = replay_file
        .entries()
        .iter()
        .filter(|entry| topic_matches(&entry.topic, &topic_name))
        .collect::<Vec<_>>();

    let observed_entries = entries.len();
    let capture_span_ns = match (entries.first(), entries.last()) {
        (Some(first), Some(last)) => last.captured_at.0.saturating_sub(first.captured_at.0) as u64,
        _ => 0,
    };
    let hz = if observed_entries < 2 || capture_span_ns == 0 {
        0.0
    } else {
        let seconds = capture_span_ns as f64 / 1_000_000_000.0;
        (observed_entries as f64 - 1.0) / seconds
    };

    if json {
        let payload = serde_json::json!({
            "topic": topic_name,
            "input": input,
            "observed_entries": observed_entries,
            "capture_span_ns": capture_span_ns,
            "hz": hz,
        });
        println!(
            "{}",
            serde_json::to_string_pretty(&payload)
                .map_err(|err| format!("serialize topic hz failed: {err}"))?
        );
    } else {
        println!("RobotRT Topic Hz");
        println!("topic: {}", topic_name);
        println!("input: {}", input.display());
        println!("observed_entries: {}", observed_entries);
        println!("capture_span_ns: {}", capture_span_ns);
        println!("hz: {:.3}", hz);
    }

    Ok(())
}

pub fn topic_echo(args: &[String]) -> Result<(), String> {
    let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
    let input = option_value(args, "--input")
        .map(PathBuf::from)
        .unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
    let limit = parse_usize_option(args, "--limit", 5)?;
    let json = has_flag(args, "--json");

    let replay_file = FileReplaySession::open(&input)
        .map_err(|err| format!("open bag file {} failed: {err}", input.display()))?;
    let rows = replay_file
        .entries()
        .iter()
        .filter(|entry| topic_matches(&entry.topic, &topic_name))
        .take(limit)
        .map(entry_to_json)
        .collect::<Vec<_>>();

    if json {
        let payload = serde_json::json!({
            "topic": topic_name,
            "input": input,
            "limit": limit,
            "entries": rows,
        });
        println!(
            "{}",
            serde_json::to_string_pretty(&payload)
                .map_err(|err| format!("serialize topic echo failed: {err}"))?
        );
    } else {
        println!("RobotRT Topic Echo");
        println!("topic: {}", topic_name);
        println!("input: {}", input.display());
        for row in rows {
            println!("{}", row);
        }
    }

    Ok(())
}

pub fn bag_record(args: &[String]) -> Result<(), String> {
    let output = option_value(args, "--output")
        .map(PathBuf::from)
        .unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
    let count = parse_usize_option(args, "--count", 20)?;
    let topic = option_value(args, "--topic").unwrap_or_else(|| String::from("/demo/bag"));
    let domain = parse_domain_option(option_value(args, "--domain"))?;

    if let Some(parent) = output.parent() {
        std::fs::create_dir_all(parent)
            .map_err(|err| format!("create output directory {} failed: {err}", parent.display()))?;
    }

    let mut recorder = FilePacketRecorder::create(&output)
        .map_err(|err| format!("create bag file {} failed: {err}", output.display()))?;

    for seq in 0..count {
        let packet = make_demo_packet(seq as u64, domain);
        let entry = RecordedEntry::new(Timestamp::now(), topic.clone(), packet);
        recorder
            .append(&entry)
            .map_err(|err| format!("append bag entry failed: {err}"))?;
    }
    recorder
        .flush()
        .map_err(|err| format!("flush bag file {} failed: {err}", output.display()))?;

    println!("bag recorded: {}", output.display());
    println!("entries: {}", count);
    println!("topic: {}", topic);
    println!("domain: {:?}", domain);
    Ok(())
}

pub fn bag_play(args: &[String]) -> Result<(), String> {
    let input = option_value(args, "--input")
        .map(PathBuf::from)
        .unwrap_or_else(|| PathBuf::from(DEFAULT_BAG_PATH));
    let speed = parse_replay_speed(option_value(args, "--speed"))?;
    let json = has_flag(args, "--json");

    let replay_file = FileReplaySession::open(&input)
        .map_err(|err| format!("open bag file {} failed: {err}", input.display()))?;
    let entries = replay_file.entries();
    let mut session = ReplaySession::new(entries, speed.clone());
    let started_at = std::time::Instant::now();

    let mut emitted = 0usize;
    while !session.is_done() {
        let count = session.pump(|_entry| {});
        emitted += count;
        if count == 0 {
            std::thread::sleep(Duration::from_millis(1));
        }
    }

    let elapsed_ns = started_at.elapsed().as_nanos() as u64;
    let topic_count = entries
        .iter()
        .map(|entry| entry.topic.as_str())
        .collect::<HashSet<_>>()
        .len();
    let capture_span_ns = match (entries.first(), entries.last()) {
        (Some(first), Some(last)) => {
            (last.captured_at.0.saturating_sub(first.captured_at.0)) as u64
        }
        _ => 0,
    };

    if json {
        let payload = serde_json::json!({
            "input": input,
            "speed": replay_speed_label(&speed),
            "total_entries": entries.len(),
            "emitted_entries": emitted,
            "topic_count": topic_count,
            "capture_span_ns": capture_span_ns,
            "replay_elapsed_ns": elapsed_ns,
        });
        let out = serde_json::to_string_pretty(&payload)
            .map_err(|err| format!("serialize bag replay summary failed: {err}"))?;
        println!("{out}");
    } else {
        println!("RobotRT Bag Replay");
        println!("input: {}", input.display());
        println!("speed: {}", replay_speed_label(&speed));
        println!("total_entries: {}", entries.len());
        println!("emitted_entries: {}", emitted);
        println!("topic_count: {}", topic_count);
        println!("capture_span_ns: {}", capture_span_ns);
        println!("replay_elapsed_ns: {}", elapsed_ns);
    }

    Ok(())
}