mecha10_core/
recorder.rs

1//! Bag File Recording and Replay (ROS bag-like)
2//!
3//! Records messages from arbitrary topics to bag files and replays them later.
4//! Similar to ROS bags but designed for the Mecha10 framework.
5//!
6//! # Features
7//!
8//! - Record messages from any topic with timestamps
9//! - Replay messages with original or custom timing
10//! - Multiple recording formats (CBOR, MessagePack, JSON)
11//! - Compression support (zstd, gzip)
12//! - Topic filtering on record and replay
13//! - Time manipulation (speed up, slow down, step through)
14//! - Metadata tracking (topics, message counts, duration)
15//!
16//! # File Format
17//!
18//! Bag files use a simple chunked format:
19//!
20//! ```text
21//! [Header]
22//! - Magic bytes: "METABAG1"
23//! - Version: u32
24//! - Compression: u8 (0=none, 1=zstd, 2=gzip)
25//! - Metadata length: u64
26//! - Metadata: CBOR-encoded BagMetadata
27//!
28//! [Chunk 0]
29//! - Timestamp: u64 (microseconds since epoch)
30//! - Topic length: u32
31//! - Topic: String
32//! - Message length: u32
33//! - Message data: Vec<u8> (serialized message)
34//!
35//! [Chunk 1]
36//! ...
37//! ```
38//!
39//! # Example: Recording
40//!
41//! ```rust,no_run
42//! use mecha10::prelude::*;
43//! use mecha10::recorder::{BagRecorder, RecorderConfig};
44//!
45//! #[tokio::main]
46//! async fn main() -> Result<()> {
47//!     let ctx = Context::new("recorder").await?;
48//!
49//!     let config = RecorderConfig {
50//!         output_path: "/tmp/recording.bag".into(),
51//!         topics: vec![
52//!             "/sensor/camera/rgb".to_string(),
53//!             "/sensor/lidar/scan".to_string(),
54//!         ],
55//!         compression: Compression::Zstd,
56//!         buffer_size: 1000,
57//!     };
58//!
59//!     let mut recorder = BagRecorder::new(ctx, config).await?;
60//!     recorder.start().await?;
61//!
62//!     // Record for 60 seconds
63//!     tokio::time::sleep(Duration::from_secs(60)).await;
64//!
65//!     recorder.stop().await?;
66//!     println!("Recording saved: {:?}", recorder.metadata());
67//!
68//!     Ok(())
69//! }
70//! ```
71//!
72//! # Example: Replay
73//!
74//! ```rust,no_run
75//! use mecha10::prelude::*;
76//! use mecha10::recorder::{BagPlayer, PlayerConfig};
77//!
78//! #[tokio::main]
79//! async fn main() -> Result<()> {
80//!     let ctx = Context::new("player").await?;
81//!
82//!     let config = PlayerConfig {
83//!         bag_path: "/tmp/recording.bag".into(),
84//!         topics: None, // Replay all topics
85//!         rate: 1.0, // Real-time speed
86//!         loop_playback: false,
87//!         start_paused: false,
88//!     };
89//!
90//!     let mut player = BagPlayer::new(ctx, config).await?;
91//!     player.play().await?;
92//!
93//!     Ok(())
94//! }
95//! ```
96
97use crate::{Context, Mecha10Error, Result};
98use serde::{Deserialize, Serialize};
99use std::collections::HashMap;
100use std::io::{Read, Write};
101use std::path::PathBuf;
102use std::sync::Arc;
103use std::time::Duration;
104use tokio::sync::RwLock;
105use tracing::{debug, info, warn};
106
107// ============================================================================
108// Constants
109// ============================================================================
110
111const MAGIC_BYTES: &[u8; 8] = b"METABAG1";
112const VERSION: u32 = 1;
113
114// ============================================================================
115// Types
116// ============================================================================
117
118/// Compression format
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
120pub enum Compression {
121    /// No compression
122    #[default]
123    None = 0,
124    /// Zstandard compression (fast, good ratio)
125    Zstd = 1,
126    /// Gzip compression (slower, better compression)
127    Gzip = 2,
128}
129
130impl From<u8> for Compression {
131    fn from(v: u8) -> Self {
132        match v {
133            1 => Compression::Zstd,
134            2 => Compression::Gzip,
135            _ => Compression::None,
136        }
137    }
138}
139
140/// Bag file metadata
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct BagMetadata {
143    /// Topics recorded
144    pub topics: Vec<String>,
145
146    /// Message count per topic
147    pub message_counts: HashMap<String, u64>,
148
149    /// Start timestamp (microseconds since epoch)
150    pub start_time_us: u64,
151
152    /// End timestamp (microseconds since epoch)
153    pub end_time_us: u64,
154
155    /// Duration in seconds
156    pub duration_sec: f64,
157
158    /// Total message count
159    pub total_messages: u64,
160
161    /// Compression used
162    pub compression: Compression,
163
164    /// Custom metadata (key-value pairs)
165    #[serde(default)]
166    pub custom: HashMap<String, String>,
167}
168
169/// Message chunk in bag file
170#[derive(Debug, Clone)]
171struct MessageChunk {
172    /// Timestamp (microseconds since epoch)
173    timestamp_us: u64,
174
175    /// Topic name
176    topic: String,
177
178    /// Serialized message data
179    data: Vec<u8>,
180}
181
182// ============================================================================
183// Recorder Configuration
184// ============================================================================
185
186/// Recorder configuration
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct RecorderConfig {
189    /// Output bag file path
190    pub output_path: PathBuf,
191
192    /// Topics to record (empty = record all)
193    #[serde(default)]
194    pub topics: Vec<String>,
195
196    /// Compression format
197    #[serde(default)]
198    pub compression: Compression,
199
200    /// Buffer size (number of messages to buffer before writing)
201    #[serde(default = "default_buffer_size")]
202    pub buffer_size: usize,
203
204    /// Maximum file size in bytes (0 = unlimited)
205    #[serde(default)]
206    pub max_file_size: u64,
207
208    /// Split files when max size reached
209    #[serde(default)]
210    pub split_on_max_size: bool,
211}
212
213fn default_buffer_size() -> usize {
214    1000
215}
216
217impl Default for RecorderConfig {
218    fn default() -> Self {
219        Self {
220            output_path: PathBuf::from("recording.bag"),
221            topics: Vec::new(),
222            compression: Compression::None,
223            buffer_size: default_buffer_size(),
224            max_file_size: 0,
225            split_on_max_size: false,
226        }
227    }
228}
229
230// ============================================================================
231// Bag Recorder
232// ============================================================================
233
234/// Bag file recorder service
235///
236/// Records messages from specified topics to a bag file.
237pub struct BagRecorder {
238    config: RecorderConfig,
239    ctx: Arc<Context>,
240    buffer: Arc<RwLock<Vec<MessageChunk>>>,
241    metadata: Arc<RwLock<BagMetadata>>,
242    running: Arc<RwLock<bool>>,
243    write_task: Option<tokio::task::JoinHandle<()>>,
244}
245
246impl BagRecorder {
247    /// Create a new bag recorder
248    pub async fn new(ctx: Context, config: RecorderConfig) -> Result<Self> {
249        let metadata = BagMetadata {
250            topics: config.topics.clone(),
251            message_counts: HashMap::new(),
252            start_time_us: crate::prelude::now_micros(),
253            end_time_us: 0,
254            duration_sec: 0.0,
255            total_messages: 0,
256            compression: config.compression,
257            custom: HashMap::new(),
258        };
259
260        Ok(Self {
261            config,
262            ctx: Arc::new(ctx),
263            buffer: Arc::new(RwLock::new(Vec::new())),
264            metadata: Arc::new(RwLock::new(metadata)),
265            running: Arc::new(RwLock::new(false)),
266            write_task: None,
267        })
268    }
269
270    /// Start recording
271    pub async fn start(&mut self) -> Result<()> {
272        *self.running.write().await = true;
273
274        info!("Starting bag recorder: {:?}", self.config.output_path);
275
276        // Write header
277        self.write_header().await?;
278
279        // Spawn background task to write buffered messages
280        let buffer = Arc::clone(&self.buffer);
281        let _metadata = Arc::clone(&self.metadata);
282        let running = Arc::clone(&self.running);
283        let output_path = self.config.output_path.clone();
284        let compression = self.config.compression;
285        let buffer_size = self.config.buffer_size;
286
287        let write_task = tokio::spawn(async move {
288            while *running.read().await {
289                tokio::time::sleep(Duration::from_millis(100)).await;
290
291                let mut buf = buffer.write().await;
292                if buf.len() >= buffer_size {
293                    if let Err(e) = Self::write_chunks(&output_path, &buf, compression).await {
294                        warn!("Failed to write chunks: {}", e);
295                    }
296                    buf.clear();
297                }
298            }
299
300            // Final flush
301            let buf = buffer.read().await;
302            if !buf.is_empty() {
303                if let Err(e) = Self::write_chunks(&output_path, &buf, compression).await {
304                    warn!("Failed to write final chunks: {}", e);
305                }
306            }
307        });
308
309        self.write_task = Some(write_task);
310
311        // Subscribe to topics and record messages
312        for topic in &self.config.topics {
313            let topic_clone = topic.clone();
314            let buffer = Arc::clone(&self.buffer);
315            let metadata = Arc::clone(&self.metadata);
316            let running = Arc::clone(&self.running);
317            let ctx = Arc::clone(&self.ctx);
318
319            tokio::spawn(async move {
320                // Subscribe to raw bytes to avoid deserialization
321                match ctx.subscribe_raw::<serde_json::Value>(&topic_clone).await {
322                    Ok(mut rx) => {
323                        while *running.read().await {
324                            if let Some(msg) = rx.recv().await {
325                                // Serialize to CBOR
326                                if let Ok(data) = serde_cbor::to_vec(&msg) {
327                                    let chunk = MessageChunk {
328                                        timestamp_us: crate::prelude::now_micros(),
329                                        topic: topic_clone.clone(),
330                                        data,
331                                    };
332
333                                    buffer.write().await.push(chunk);
334
335                                    // Update metadata
336                                    let mut meta = metadata.write().await;
337                                    *meta.message_counts.entry(topic_clone.clone()).or_insert(0) += 1;
338                                    meta.total_messages += 1;
339                                }
340                            }
341                        }
342                    }
343                    Err(e) => {
344                        warn!("Failed to subscribe to {}: {}", topic_clone, e);
345                    }
346                }
347            });
348        }
349
350        Ok(())
351    }
352
353    /// Stop recording
354    pub async fn stop(&mut self) -> Result<()> {
355        *self.running.write().await = false;
356
357        info!("Stopping bag recorder");
358
359        // Wait for write task to complete
360        if let Some(task) = self.write_task.take() {
361            task.await
362                .map_err(|e| Mecha10Error::Other(format!("Write task error: {}", e)))?;
363        }
364
365        // Update metadata
366        let mut meta = self.metadata.write().await;
367        meta.end_time_us = crate::prelude::now_micros();
368        meta.duration_sec = (meta.end_time_us - meta.start_time_us) as f64 / 1_000_000.0;
369
370        // Write final metadata (overwrite header)
371        self.write_header().await?;
372
373        info!(
374            "Recording complete: {} messages, {:.1}s",
375            meta.total_messages, meta.duration_sec
376        );
377
378        Ok(())
379    }
380
381    /// Get current metadata
382    pub async fn metadata(&self) -> BagMetadata {
383        self.metadata.read().await.clone()
384    }
385
386    /// Write bag file header with metadata
387    async fn write_header(&self) -> Result<()> {
388        let meta = self.metadata.read().await;
389
390        let mut file = std::fs::OpenOptions::new()
391            .create(true)
392            .write(true)
393            .truncate(true)
394            .open(&self.config.output_path)
395            .map_err(Mecha10Error::Io)?;
396
397        // Write magic bytes
398        file.write_all(MAGIC_BYTES).map_err(Mecha10Error::Io)?;
399
400        // Write version
401        file.write_all(&VERSION.to_le_bytes()).map_err(Mecha10Error::Io)?;
402
403        // Write compression
404        file.write_all(&[self.config.compression as u8])
405            .map_err(Mecha10Error::Io)?;
406
407        // Write metadata
408        let meta_bytes =
409            serde_cbor::to_vec(&*meta).map_err(|e| Mecha10Error::Other(format!("Serialization error: {}", e)))?;
410
411        file.write_all(&(meta_bytes.len() as u64).to_le_bytes())
412            .map_err(Mecha10Error::Io)?;
413
414        file.write_all(&meta_bytes).map_err(Mecha10Error::Io)?;
415
416        Ok(())
417    }
418
419    /// Write message chunks to file
420    async fn write_chunks(path: &PathBuf, chunks: &[MessageChunk], _compression: Compression) -> Result<()> {
421        let mut file = std::fs::OpenOptions::new()
422            .create(true)
423            .append(true)
424            .open(path)
425            .map_err(Mecha10Error::Io)?;
426
427        for chunk in chunks {
428            // Write timestamp
429            file.write_all(&chunk.timestamp_us.to_le_bytes())
430                .map_err(Mecha10Error::Io)?;
431
432            // Write topic
433            let topic_bytes = chunk.topic.as_bytes();
434            file.write_all(&(topic_bytes.len() as u32).to_le_bytes())
435                .map_err(Mecha10Error::Io)?;
436            file.write_all(topic_bytes).map_err(Mecha10Error::Io)?;
437
438            // Write message data
439            file.write_all(&(chunk.data.len() as u32).to_le_bytes())
440                .map_err(Mecha10Error::Io)?;
441            file.write_all(&chunk.data).map_err(Mecha10Error::Io)?;
442        }
443
444        Ok(())
445    }
446}
447
448// ============================================================================
449// Player Configuration
450// ============================================================================
451
452/// Player configuration
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct PlayerConfig {
455    /// Bag file path to play
456    pub bag_path: PathBuf,
457
458    /// Topics to replay (None = replay all)
459    #[serde(default)]
460    pub topics: Option<Vec<String>>,
461
462    /// Playback rate (1.0 = real-time, 2.0 = 2x speed, 0.5 = half speed)
463    #[serde(default = "default_rate")]
464    pub rate: f64,
465
466    /// Loop playback
467    #[serde(default)]
468    pub loop_playback: bool,
469
470    /// Start paused (requires manual play command)
471    #[serde(default)]
472    pub start_paused: bool,
473
474    /// Skip to timestamp (microseconds from start)
475    #[serde(default)]
476    pub skip_to_us: u64,
477}
478
479fn default_rate() -> f64 {
480    1.0
481}
482
483impl Default for PlayerConfig {
484    fn default() -> Self {
485        Self {
486            bag_path: PathBuf::from("recording.bag"),
487            topics: None,
488            rate: 1.0,
489            loop_playback: false,
490            start_paused: false,
491            skip_to_us: 0,
492        }
493    }
494}
495
496// ============================================================================
497// Bag Player
498// ============================================================================
499
500/// Bag file player service
501///
502/// Replays messages from a bag file to their original topics.
503pub struct BagPlayer {
504    config: PlayerConfig,
505    ctx: Arc<Context>,
506    metadata: BagMetadata,
507    chunks: Vec<MessageChunk>,
508    paused: Arc<RwLock<bool>>,
509    playing: Arc<RwLock<bool>>,
510}
511
512impl BagPlayer {
513    /// Create a new bag player
514    pub async fn new(ctx: Context, config: PlayerConfig) -> Result<Self> {
515        info!("Loading bag file: {:?}", config.bag_path);
516
517        // Read and parse bag file
518        let (metadata, chunks) = Self::read_bag_file(&config.bag_path).await?;
519
520        info!(
521            "Loaded bag: {} messages, {:.1}s duration, {} topics",
522            metadata.total_messages,
523            metadata.duration_sec,
524            metadata.topics.len()
525        );
526
527        Ok(Self {
528            config,
529            ctx: Arc::new(ctx),
530            metadata,
531            chunks,
532            paused: Arc::new(RwLock::new(false)),
533            playing: Arc::new(RwLock::new(false)),
534        })
535    }
536
537    /// Start playback
538    pub async fn play(&mut self) -> Result<()> {
539        if self.config.start_paused {
540            *self.paused.write().await = true;
541        }
542
543        *self.playing.write().await = true;
544
545        info!("Starting playback at {}x speed", self.config.rate);
546
547        loop {
548            let start_time = self.chunks.first().map(|c| c.timestamp_us).unwrap_or(0);
549            let playback_start = std::time::Instant::now();
550
551            for chunk in &self.chunks {
552                // Wait for pause to be released
553                while *self.paused.read().await {
554                    tokio::time::sleep(Duration::from_millis(100)).await;
555                }
556
557                if !*self.playing.read().await {
558                    break;
559                }
560
561                // Apply topic filtering
562                if let Some(topics) = &self.config.topics {
563                    if !topics.contains(&chunk.topic) {
564                        continue;
565                    }
566                }
567
568                // Calculate sleep time based on playback rate
569                let msg_time_offset = chunk.timestamp_us - start_time;
570                let real_time_offset = (msg_time_offset as f64 / self.config.rate) as u64;
571                let target_time = playback_start + Duration::from_micros(real_time_offset);
572
573                if let Some(sleep_duration) = target_time.checked_duration_since(std::time::Instant::now()) {
574                    tokio::time::sleep(sleep_duration).await;
575                }
576
577                // Deserialize and publish message
578                match serde_cbor::from_slice::<serde_json::Value>(&chunk.data) {
579                    Ok(msg) => {
580                        if let Err(e) = self.ctx.publish_raw(&chunk.topic, &msg).await {
581                            warn!("Failed to publish to {}: {}", chunk.topic, e);
582                        } else {
583                            debug!("Replayed message on {}", chunk.topic);
584                        }
585                    }
586                    Err(e) => {
587                        warn!("Failed to deserialize message: {}", e);
588                    }
589                }
590            }
591
592            if !self.config.loop_playback {
593                break;
594            }
595
596            info!("Looping playback");
597        }
598
599        *self.playing.write().await = false;
600        info!("Playback complete");
601
602        Ok(())
603    }
604
605    /// Pause playback
606    pub async fn pause(&self) {
607        *self.paused.write().await = true;
608        info!("Playback paused");
609    }
610
611    /// Resume playback
612    pub async fn resume(&self) {
613        *self.paused.write().await = false;
614        info!("Playback resumed");
615    }
616
617    /// Stop playback
618    pub async fn stop(&self) {
619        *self.playing.write().await = false;
620        info!("Playback stopped");
621    }
622
623    /// Get bag metadata
624    pub fn metadata(&self) -> &BagMetadata {
625        &self.metadata
626    }
627
628    /// Read and parse bag file
629    async fn read_bag_file(path: &PathBuf) -> Result<(BagMetadata, Vec<MessageChunk>)> {
630        let mut file = std::fs::File::open(path).map_err(Mecha10Error::Io)?;
631
632        // Read magic bytes
633        let mut magic = [0u8; 8];
634        file.read_exact(&mut magic).map_err(Mecha10Error::Io)?;
635
636        if &magic != MAGIC_BYTES {
637            return Err(Mecha10Error::Other("Invalid bag file format".to_string()));
638        }
639
640        // Read version
641        let mut version_bytes = [0u8; 4];
642        file.read_exact(&mut version_bytes).map_err(Mecha10Error::Io)?;
643        let version = u32::from_le_bytes(version_bytes);
644
645        if version != VERSION {
646            return Err(Mecha10Error::Other(format!("Unsupported bag version: {}", version)));
647        }
648
649        // Read compression
650        let mut compression_byte = [0u8; 1];
651        file.read_exact(&mut compression_byte).map_err(Mecha10Error::Io)?;
652        let _compression = Compression::from(compression_byte[0]);
653
654        // Read metadata
655        let mut meta_len_bytes = [0u8; 8];
656        file.read_exact(&mut meta_len_bytes).map_err(Mecha10Error::Io)?;
657        let meta_len = u64::from_le_bytes(meta_len_bytes) as usize;
658
659        let mut meta_bytes = vec![0u8; meta_len];
660        file.read_exact(&mut meta_bytes).map_err(Mecha10Error::Io)?;
661
662        let metadata: BagMetadata = serde_cbor::from_slice(&meta_bytes)
663            .map_err(|e| Mecha10Error::Other(format!("Failed to parse metadata: {}", e)))?;
664
665        // Read message chunks
666        let mut chunks = Vec::new();
667
668        loop {
669            // Read timestamp
670            let mut timestamp_bytes = [0u8; 8];
671            if file.read_exact(&mut timestamp_bytes).is_err() {
672                break; // End of file
673            }
674            let timestamp_us = u64::from_le_bytes(timestamp_bytes);
675
676            // Read topic
677            let mut topic_len_bytes = [0u8; 4];
678            file.read_exact(&mut topic_len_bytes).map_err(Mecha10Error::Io)?;
679            let topic_len = u32::from_le_bytes(topic_len_bytes) as usize;
680
681            let mut topic_bytes = vec![0u8; topic_len];
682            file.read_exact(&mut topic_bytes).map_err(Mecha10Error::Io)?;
683            let topic = String::from_utf8(topic_bytes)
684                .map_err(|e| Mecha10Error::Other(format!("Invalid topic string: {}", e)))?;
685
686            // Read message data
687            let mut data_len_bytes = [0u8; 4];
688            file.read_exact(&mut data_len_bytes).map_err(Mecha10Error::Io)?;
689            let data_len = u32::from_le_bytes(data_len_bytes) as usize;
690
691            let mut data = vec![0u8; data_len];
692            file.read_exact(&mut data).map_err(Mecha10Error::Io)?;
693
694            chunks.push(MessageChunk {
695                timestamp_us,
696                topic,
697                data,
698            });
699        }
700
701        Ok((metadata, chunks))
702    }
703}