ros2kit 0.2.1

Rust utility library for ROS 2 workspace management and ament environment discovery
Documentation
use std::collections::BTreeMap;
use std::fs::File;
use std::io::BufWriter;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use anyhow::Result;
use mcap::Writer;
use mcap::records::MessageHeader;
use mcap::write::WriteOptions;
use tokio::sync::{mpsc, watch};

use crate::ament::Env;

pub struct TopicRecording {
    pub topic: String,
    pub type_name: String,
    pub rx: mpsc::Receiver<Vec<u8>>,
}

pub struct RecordConfig {
    pub output_path: PathBuf,
    pub topics: Vec<TopicRecording>,
}

#[derive(Debug, Clone, Default)]
pub struct RecordStats {
    pub message_count: u64,
    pub bytes_written: u64,
    pub duration: Duration,
}

pub struct RecordResult {
    pub stats_rx: watch::Receiver<RecordStats>,
    pub skipped_topics: Vec<String>,
}

pub struct Recorder {
    active: Arc<AtomicBool>,
    task: Option<tokio::task::JoinHandle<()>>,
}

impl Default for Recorder {
    fn default() -> Self {
        Self::new()
    }
}

impl Recorder {
    pub fn new() -> Self {
        Self {
            active: Arc::new(AtomicBool::new(false)),
            task: None,
        }
    }

    pub fn is_recording(&self) -> bool {
        self.active.load(Ordering::Relaxed)
    }

    pub fn start(&mut self, config: RecordConfig, env: &Env) -> Result<RecordResult> {
        if self.is_recording() {
            anyhow::bail!("already recording");
        }

        let file = File::create(&config.output_path)?;
        let buf = BufWriter::new(file);
        let mut writer = WriteOptions::new().profile("ros2").create(buf)?;

        let mut channels = Vec::new();
        let mut receivers = Vec::new();
        let mut skipped_topics = Vec::new();

        for topic_rec in config.topics {
            let definition = match env.resolve_msg_definition(&topic_rec.type_name) {
                Some(def) => def,
                None => {
                    skipped_topics.push(topic_rec.topic.clone());
                    continue;
                }
            };

            let schema_id =
                writer.add_schema(&topic_rec.type_name, "ros2msg", definition.as_bytes())?;

            let channel_id =
                writer.add_channel(schema_id, &topic_rec.topic, "cdr", &BTreeMap::new())?;

            channels.push(channel_id);
            receivers.push(topic_rec.rx);
        }

        if receivers.is_empty() {
            anyhow::bail!("no topics could be recorded (all .msg definitions unresolvable)");
        }

        let (stats_tx, stats_rx) = watch::channel(RecordStats::default());
        let active = self.active.clone();
        active.store(true, Ordering::Relaxed);

        let handle = tokio::spawn(async move {
            let _ = record_loop(writer, channels, receivers, stats_tx, active).await;
        });

        self.task = Some(handle);

        Ok(RecordResult {
            stats_rx,
            skipped_topics,
        })
    }

    pub fn stop(&mut self) {
        self.active.store(false, Ordering::Relaxed);
        self.task = None;
    }
}

async fn record_loop(
    mut writer: Writer<BufWriter<File>>,
    channels: Vec<u16>,
    mut receivers: Vec<mpsc::Receiver<Vec<u8>>>,
    stats_tx: watch::Sender<RecordStats>,
    active: Arc<AtomicBool>,
) -> Result<()> {
    let start = Instant::now();
    let mut stats = RecordStats::default();
    let mut sequence: u32 = 0;

    loop {
        if !active.load(Ordering::Relaxed) {
            break;
        }

        let mut all_closed = true;
        let mut got_message = false;

        for (i, rx) in receivers.iter_mut().enumerate() {
            match rx.try_recv() {
                Ok(data) => {
                    all_closed = false;
                    got_message = true;

                    let now = SystemTime::now()
                        .duration_since(UNIX_EPOCH)
                        .unwrap_or_default()
                        .as_nanos() as u64;

                    let header = MessageHeader {
                        channel_id: channels[i],
                        sequence,
                        log_time: now,
                        publish_time: now,
                    };

                    stats.bytes_written += data.len() as u64;
                    stats.message_count += 1;
                    stats.duration = start.elapsed();
                    sequence = sequence.wrapping_add(1);

                    let _ = writer.write_to_known_channel(&header, &data);
                    let _ = stats_tx.send(stats.clone());
                }
                Err(mpsc::error::TryRecvError::Empty) => {
                    all_closed = false;
                }
                Err(mpsc::error::TryRecvError::Disconnected) => {}
            }
        }

        if all_closed {
            break;
        }

        if !got_message {
            tokio::time::sleep(Duration::from_millis(1)).await;
        }
    }

    active.store(false, Ordering::Relaxed);
    writer.finish()?;
    Ok(())
}