1use crate::{Context, Mecha10Error, Result};
98use serde::{Deserialize, Serialize};
99use std::collections::HashMap;
100use std::io::{Read, Write};
101use std::path::PathBuf;
102use std::sync::Arc;
103use std::time::Duration;
104use tokio::sync::RwLock;
105use tracing::{debug, info, warn};
106
107const MAGIC_BYTES: &[u8; 8] = b"METABAG1";
112const VERSION: u32 = 1;
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
120pub enum Compression {
121 #[default]
123 None = 0,
124 Zstd = 1,
126 Gzip = 2,
128}
129
130impl From<u8> for Compression {
131 fn from(v: u8) -> Self {
132 match v {
133 1 => Compression::Zstd,
134 2 => Compression::Gzip,
135 _ => Compression::None,
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct BagMetadata {
143 pub topics: Vec<String>,
145
146 pub message_counts: HashMap<String, u64>,
148
149 pub start_time_us: u64,
151
152 pub end_time_us: u64,
154
155 pub duration_sec: f64,
157
158 pub total_messages: u64,
160
161 pub compression: Compression,
163
164 #[serde(default)]
166 pub custom: HashMap<String, String>,
167}
168
169#[derive(Debug, Clone)]
171struct MessageChunk {
172 timestamp_us: u64,
174
175 topic: String,
177
178 data: Vec<u8>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct RecorderConfig {
189 pub output_path: PathBuf,
191
192 #[serde(default)]
194 pub topics: Vec<String>,
195
196 #[serde(default)]
198 pub compression: Compression,
199
200 #[serde(default = "default_buffer_size")]
202 pub buffer_size: usize,
203
204 #[serde(default)]
206 pub max_file_size: u64,
207
208 #[serde(default)]
210 pub split_on_max_size: bool,
211}
212
213fn default_buffer_size() -> usize {
214 1000
215}
216
217impl Default for RecorderConfig {
218 fn default() -> Self {
219 Self {
220 output_path: PathBuf::from("recording.bag"),
221 topics: Vec::new(),
222 compression: Compression::None,
223 buffer_size: default_buffer_size(),
224 max_file_size: 0,
225 split_on_max_size: false,
226 }
227 }
228}
229
230pub struct BagRecorder {
238 config: RecorderConfig,
239 ctx: Arc<Context>,
240 buffer: Arc<RwLock<Vec<MessageChunk>>>,
241 metadata: Arc<RwLock<BagMetadata>>,
242 running: Arc<RwLock<bool>>,
243 write_task: Option<tokio::task::JoinHandle<()>>,
244}
245
246impl BagRecorder {
247 pub async fn new(ctx: Context, config: RecorderConfig) -> Result<Self> {
249 let metadata = BagMetadata {
250 topics: config.topics.clone(),
251 message_counts: HashMap::new(),
252 start_time_us: crate::prelude::now_micros(),
253 end_time_us: 0,
254 duration_sec: 0.0,
255 total_messages: 0,
256 compression: config.compression,
257 custom: HashMap::new(),
258 };
259
260 Ok(Self {
261 config,
262 ctx: Arc::new(ctx),
263 buffer: Arc::new(RwLock::new(Vec::new())),
264 metadata: Arc::new(RwLock::new(metadata)),
265 running: Arc::new(RwLock::new(false)),
266 write_task: None,
267 })
268 }
269
270 pub async fn start(&mut self) -> Result<()> {
272 *self.running.write().await = true;
273
274 info!("Starting bag recorder: {:?}", self.config.output_path);
275
276 self.write_header().await?;
278
279 let buffer = Arc::clone(&self.buffer);
281 let _metadata = Arc::clone(&self.metadata);
282 let running = Arc::clone(&self.running);
283 let output_path = self.config.output_path.clone();
284 let compression = self.config.compression;
285 let buffer_size = self.config.buffer_size;
286
287 let write_task = tokio::spawn(async move {
288 while *running.read().await {
289 tokio::time::sleep(Duration::from_millis(100)).await;
290
291 let mut buf = buffer.write().await;
292 if buf.len() >= buffer_size {
293 if let Err(e) = Self::write_chunks(&output_path, &buf, compression).await {
294 warn!("Failed to write chunks: {}", e);
295 }
296 buf.clear();
297 }
298 }
299
300 let buf = buffer.read().await;
302 if !buf.is_empty() {
303 if let Err(e) = Self::write_chunks(&output_path, &buf, compression).await {
304 warn!("Failed to write final chunks: {}", e);
305 }
306 }
307 });
308
309 self.write_task = Some(write_task);
310
311 for topic in &self.config.topics {
313 let topic_clone = topic.clone();
314 let buffer = Arc::clone(&self.buffer);
315 let metadata = Arc::clone(&self.metadata);
316 let running = Arc::clone(&self.running);
317 let ctx = Arc::clone(&self.ctx);
318
319 tokio::spawn(async move {
320 match ctx.subscribe_raw::<serde_json::Value>(&topic_clone).await {
322 Ok(mut rx) => {
323 while *running.read().await {
324 if let Some(msg) = rx.recv().await {
325 if let Ok(data) = serde_cbor::to_vec(&msg) {
327 let chunk = MessageChunk {
328 timestamp_us: crate::prelude::now_micros(),
329 topic: topic_clone.clone(),
330 data,
331 };
332
333 buffer.write().await.push(chunk);
334
335 let mut meta = metadata.write().await;
337 *meta.message_counts.entry(topic_clone.clone()).or_insert(0) += 1;
338 meta.total_messages += 1;
339 }
340 }
341 }
342 }
343 Err(e) => {
344 warn!("Failed to subscribe to {}: {}", topic_clone, e);
345 }
346 }
347 });
348 }
349
350 Ok(())
351 }
352
353 pub async fn stop(&mut self) -> Result<()> {
355 *self.running.write().await = false;
356
357 info!("Stopping bag recorder");
358
359 if let Some(task) = self.write_task.take() {
361 task.await
362 .map_err(|e| Mecha10Error::Other(format!("Write task error: {}", e)))?;
363 }
364
365 let mut meta = self.metadata.write().await;
367 meta.end_time_us = crate::prelude::now_micros();
368 meta.duration_sec = (meta.end_time_us - meta.start_time_us) as f64 / 1_000_000.0;
369
370 self.write_header().await?;
372
373 info!(
374 "Recording complete: {} messages, {:.1}s",
375 meta.total_messages, meta.duration_sec
376 );
377
378 Ok(())
379 }
380
381 pub async fn metadata(&self) -> BagMetadata {
383 self.metadata.read().await.clone()
384 }
385
386 async fn write_header(&self) -> Result<()> {
388 let meta = self.metadata.read().await;
389
390 let mut file = std::fs::OpenOptions::new()
391 .create(true)
392 .write(true)
393 .truncate(true)
394 .open(&self.config.output_path)
395 .map_err(Mecha10Error::Io)?;
396
397 file.write_all(MAGIC_BYTES).map_err(Mecha10Error::Io)?;
399
400 file.write_all(&VERSION.to_le_bytes()).map_err(Mecha10Error::Io)?;
402
403 file.write_all(&[self.config.compression as u8])
405 .map_err(Mecha10Error::Io)?;
406
407 let meta_bytes =
409 serde_cbor::to_vec(&*meta).map_err(|e| Mecha10Error::Other(format!("Serialization error: {}", e)))?;
410
411 file.write_all(&(meta_bytes.len() as u64).to_le_bytes())
412 .map_err(Mecha10Error::Io)?;
413
414 file.write_all(&meta_bytes).map_err(Mecha10Error::Io)?;
415
416 Ok(())
417 }
418
419 async fn write_chunks(path: &PathBuf, chunks: &[MessageChunk], _compression: Compression) -> Result<()> {
421 let mut file = std::fs::OpenOptions::new()
422 .create(true)
423 .append(true)
424 .open(path)
425 .map_err(Mecha10Error::Io)?;
426
427 for chunk in chunks {
428 file.write_all(&chunk.timestamp_us.to_le_bytes())
430 .map_err(Mecha10Error::Io)?;
431
432 let topic_bytes = chunk.topic.as_bytes();
434 file.write_all(&(topic_bytes.len() as u32).to_le_bytes())
435 .map_err(Mecha10Error::Io)?;
436 file.write_all(topic_bytes).map_err(Mecha10Error::Io)?;
437
438 file.write_all(&(chunk.data.len() as u32).to_le_bytes())
440 .map_err(Mecha10Error::Io)?;
441 file.write_all(&chunk.data).map_err(Mecha10Error::Io)?;
442 }
443
444 Ok(())
445 }
446}
447
448#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct PlayerConfig {
455 pub bag_path: PathBuf,
457
458 #[serde(default)]
460 pub topics: Option<Vec<String>>,
461
462 #[serde(default = "default_rate")]
464 pub rate: f64,
465
466 #[serde(default)]
468 pub loop_playback: bool,
469
470 #[serde(default)]
472 pub start_paused: bool,
473
474 #[serde(default)]
476 pub skip_to_us: u64,
477}
478
479fn default_rate() -> f64 {
480 1.0
481}
482
483impl Default for PlayerConfig {
484 fn default() -> Self {
485 Self {
486 bag_path: PathBuf::from("recording.bag"),
487 topics: None,
488 rate: 1.0,
489 loop_playback: false,
490 start_paused: false,
491 skip_to_us: 0,
492 }
493 }
494}
495
496pub struct BagPlayer {
504 config: PlayerConfig,
505 ctx: Arc<Context>,
506 metadata: BagMetadata,
507 chunks: Vec<MessageChunk>,
508 paused: Arc<RwLock<bool>>,
509 playing: Arc<RwLock<bool>>,
510}
511
512impl BagPlayer {
513 pub async fn new(ctx: Context, config: PlayerConfig) -> Result<Self> {
515 info!("Loading bag file: {:?}", config.bag_path);
516
517 let (metadata, chunks) = Self::read_bag_file(&config.bag_path).await?;
519
520 info!(
521 "Loaded bag: {} messages, {:.1}s duration, {} topics",
522 metadata.total_messages,
523 metadata.duration_sec,
524 metadata.topics.len()
525 );
526
527 Ok(Self {
528 config,
529 ctx: Arc::new(ctx),
530 metadata,
531 chunks,
532 paused: Arc::new(RwLock::new(false)),
533 playing: Arc::new(RwLock::new(false)),
534 })
535 }
536
537 pub async fn play(&mut self) -> Result<()> {
539 if self.config.start_paused {
540 *self.paused.write().await = true;
541 }
542
543 *self.playing.write().await = true;
544
545 info!("Starting playback at {}x speed", self.config.rate);
546
547 loop {
548 let start_time = self.chunks.first().map(|c| c.timestamp_us).unwrap_or(0);
549 let playback_start = std::time::Instant::now();
550
551 for chunk in &self.chunks {
552 while *self.paused.read().await {
554 tokio::time::sleep(Duration::from_millis(100)).await;
555 }
556
557 if !*self.playing.read().await {
558 break;
559 }
560
561 if let Some(topics) = &self.config.topics {
563 if !topics.contains(&chunk.topic) {
564 continue;
565 }
566 }
567
568 let msg_time_offset = chunk.timestamp_us - start_time;
570 let real_time_offset = (msg_time_offset as f64 / self.config.rate) as u64;
571 let target_time = playback_start + Duration::from_micros(real_time_offset);
572
573 if let Some(sleep_duration) = target_time.checked_duration_since(std::time::Instant::now()) {
574 tokio::time::sleep(sleep_duration).await;
575 }
576
577 match serde_cbor::from_slice::<serde_json::Value>(&chunk.data) {
579 Ok(msg) => {
580 if let Err(e) = self.ctx.publish_raw(&chunk.topic, &msg).await {
581 warn!("Failed to publish to {}: {}", chunk.topic, e);
582 } else {
583 debug!("Replayed message on {}", chunk.topic);
584 }
585 }
586 Err(e) => {
587 warn!("Failed to deserialize message: {}", e);
588 }
589 }
590 }
591
592 if !self.config.loop_playback {
593 break;
594 }
595
596 info!("Looping playback");
597 }
598
599 *self.playing.write().await = false;
600 info!("Playback complete");
601
602 Ok(())
603 }
604
605 pub async fn pause(&self) {
607 *self.paused.write().await = true;
608 info!("Playback paused");
609 }
610
611 pub async fn resume(&self) {
613 *self.paused.write().await = false;
614 info!("Playback resumed");
615 }
616
617 pub async fn stop(&self) {
619 *self.playing.write().await = false;
620 info!("Playback stopped");
621 }
622
623 pub fn metadata(&self) -> &BagMetadata {
625 &self.metadata
626 }
627
628 async fn read_bag_file(path: &PathBuf) -> Result<(BagMetadata, Vec<MessageChunk>)> {
630 let mut file = std::fs::File::open(path).map_err(Mecha10Error::Io)?;
631
632 let mut magic = [0u8; 8];
634 file.read_exact(&mut magic).map_err(Mecha10Error::Io)?;
635
636 if &magic != MAGIC_BYTES {
637 return Err(Mecha10Error::Other("Invalid bag file format".to_string()));
638 }
639
640 let mut version_bytes = [0u8; 4];
642 file.read_exact(&mut version_bytes).map_err(Mecha10Error::Io)?;
643 let version = u32::from_le_bytes(version_bytes);
644
645 if version != VERSION {
646 return Err(Mecha10Error::Other(format!("Unsupported bag version: {}", version)));
647 }
648
649 let mut compression_byte = [0u8; 1];
651 file.read_exact(&mut compression_byte).map_err(Mecha10Error::Io)?;
652 let _compression = Compression::from(compression_byte[0]);
653
654 let mut meta_len_bytes = [0u8; 8];
656 file.read_exact(&mut meta_len_bytes).map_err(Mecha10Error::Io)?;
657 let meta_len = u64::from_le_bytes(meta_len_bytes) as usize;
658
659 let mut meta_bytes = vec![0u8; meta_len];
660 file.read_exact(&mut meta_bytes).map_err(Mecha10Error::Io)?;
661
662 let metadata: BagMetadata = serde_cbor::from_slice(&meta_bytes)
663 .map_err(|e| Mecha10Error::Other(format!("Failed to parse metadata: {}", e)))?;
664
665 let mut chunks = Vec::new();
667
668 loop {
669 let mut timestamp_bytes = [0u8; 8];
671 if file.read_exact(&mut timestamp_bytes).is_err() {
672 break; }
674 let timestamp_us = u64::from_le_bytes(timestamp_bytes);
675
676 let mut topic_len_bytes = [0u8; 4];
678 file.read_exact(&mut topic_len_bytes).map_err(Mecha10Error::Io)?;
679 let topic_len = u32::from_le_bytes(topic_len_bytes) as usize;
680
681 let mut topic_bytes = vec![0u8; topic_len];
682 file.read_exact(&mut topic_bytes).map_err(Mecha10Error::Io)?;
683 let topic = String::from_utf8(topic_bytes)
684 .map_err(|e| Mecha10Error::Other(format!("Invalid topic string: {}", e)))?;
685
686 let mut data_len_bytes = [0u8; 4];
688 file.read_exact(&mut data_len_bytes).map_err(Mecha10Error::Io)?;
689 let data_len = u32::from_le_bytes(data_len_bytes) as usize;
690
691 let mut data = vec![0u8; data_len];
692 file.read_exact(&mut data).map_err(Mecha10Error::Io)?;
693
694 chunks.push(MessageChunk {
695 timestamp_us,
696 topic,
697 data,
698 });
699 }
700
701 Ok((metadata, chunks))
702 }
703}