apex-io 0.2.0

File I/O for pose graphs (G2O, TORO, BAL) and ROS2 bags with SE2/SE3 support
Documentation

apex-io

High-performance file I/O for robotics data — pose graphs (G2O, TORO, BAL) and ROS2 bag files (SQLite3 and MCAP), with optional live DDS topic subscription.

Origin

The ROS2 bag reading and writing functionality in this crate was originally developed as a standalone Rust library at rosbags-rs and has since been merged into apex-io. Bags produced and read by this crate are interoperable with the ros2 bag CLI.

Overview

  • Pose graph I/O — G2O and TORO (2D/3D SLAM), BAL (bundle adjustment)
  • ROS2 bag reading — SQLite3 and MCAP formats, topic + time-range filtering, raw and deserialized APIs
  • ROS2 bag writing — SQLite3 and MCAP with optional zstd compression
  • Live DDS subscription — async topic listener for running ROS2 systems (optional dds feature)
  • 94+ ROS2 message types — CDR deserialization for geometry_msgs, sensor_msgs, nav_msgs, std_msgs, tf2_msgs, and more
  • Memory-mapped + parallel parsing — fast I/O on large files via memmap2 and rayon
  • CLI binariesbag_info, bag_filter, extract_topic_data, write_dummy_bag, dds_multi_listener

Supported Formats

Format Description Read Write
G2O General Graph Optimization (SE2 + SE3)
TORO Tree-based netwORk Optimizer (SE2 only)
BAL Bundle Adjustment in the Large
ROS2 bag / SQLite3 .db3 storage format
ROS2 bag / MCAP .mcap storage format

Installation

# Core: pose graphs + ROS2 bag reader/writer
[dependencies]
apex-io = "0.2.0"

# With Rerun visualization helpers
apex-io = { version = "0.2.0", features = ["visualization"] }

# With live DDS topic subscription (requires a DDS runtime)
apex-io = { version = "0.2.0", features = ["dds"] }

Note: apex-io depends on apex-manifolds (for SE2/SE3 types). Both crates must be available on crates.io. If you are using the workspace they are handled automatically.


ROS2 Bag Reading

Basic — iterate all messages

use apex_io::rosbag::Reader;

let mut reader = Reader::new("path/to/my_bag")?;
reader.open()?;

// Bag-level metadata
println!("Duration:  {:.2}s", reader.duration() as f64 / 1e9);
println!("Start:     {} ns", reader.start_time());
println!("End:       {} ns", reader.end_time());
println!("Messages:  {}", reader.message_count());

// List topics
for topic in reader.topics() {
    println!("  {} [{}]  {} msgs",
        topic.name, topic.message_type, topic.message_count);
}

// Iterate every message
for msg in reader.messages()? {
    let msg = msg?;
    println!("{} @ {} ns  ({} bytes)",
        msg.connection.topic, msg.timestamp, msg.data.len());
}

reader.close()?;

Filter by topic

use apex_io::rosbag::Reader;

let mut reader = Reader::new("path/to/my_bag")?;
reader.open()?;

// Select connections whose topic matches the filter
let conns = reader.connections();
let imu_conns: Vec<_> = conns.iter()
    .filter(|c| c.topic == "/imu/data")
    .cloned()
    .collect();

for msg in reader.messages_filtered(Some(&imu_conns), None, None)? {
    let msg = msg?;
    println!("IMU @ {} ns", msg.timestamp);
}

Filter by time range

use apex_io::rosbag::Reader;

let mut reader = Reader::new("path/to/my_bag")?;
reader.open()?;

// Times are in nanoseconds since epoch
let start_ns: u64 = 1_700_000_000_000_000_000;
let end_ns:   u64 = 1_700_000_005_000_000_000; // 5 seconds later

for msg in reader.messages_filtered(None, Some(start_ns), Some(end_ns))? {
    let msg = msg?;
    println!("{} @ {} ns", msg.connection.topic, msg.timestamp);
}

Combined: topic + time filter

use apex_io::rosbag::Reader;

let mut reader = Reader::new("path/to/my_bag")?;
reader.open()?;

let conns = reader.connections();
let cam_conns: Vec<_> = conns.iter()
    .filter(|c| c.topic.starts_with("/camera"))
    .cloned()
    .collect();

let start_ns = reader.start_time();
let end_ns   = start_ns + 10_000_000_000; // first 10 seconds

for msg in reader.messages_filtered(Some(&cam_conns), Some(start_ns), Some(end_ns))? {
    let msg = msg?;
    println!("{} @ {} ns  {} bytes", msg.connection.topic, msg.timestamp, msg.data.len());
}

High-performance raw mode (no deserialization)

use apex_io::rosbag::Reader;

let mut reader = Reader::new("path/to/my_bag")?;
reader.open()?;

// raw_messages_filtered skips CDR deserialization — useful for copying or forwarding
for raw in reader.raw_messages_filtered(None, None, None)? {
    let raw = raw?;
    println!("raw {} bytes on {}", raw.raw_data.len(), raw.connection.topic);
}

// Or batch-collect everything at once (single allocation, no iterator overhead)
let batch = reader.read_raw_messages_batch(None, None, None)?;
println!("Loaded {} raw messages", batch.len());

Fast metadata-only reading

use apex_io::rosbag::read_bag_metadata_fast;

// Reads only metadata.yaml — never opens the .db3 or .mcap file
let meta = read_bag_metadata_fast("path/to/my_bag")?;
println!("Duration: {:.2}s", meta.duration() as f64 / 1e9);
println!("Messages: {}", meta.message_count());
println!("Storage:  {}", meta.info().storage_identifier);

for topic in &meta.info().topics_with_message_count {
    println!("  {} [{}]  {} msgs",
        topic.topic_metadata.name,
        topic.topic_metadata.message_type,
        topic.message_count);
}

ROS2 Bag Writing

Write a minimal bag

use apex_io::rosbag::{Writer, StoragePlugin};

let mut writer = Writer::new(
    "output_bag",
    None,                        // latest bag format version
    Some(StoragePlugin::Sqlite3),
)?;
writer.open()?;

// Register a topic before writing to it
let conn = writer.add_connection(
    "/my_topic".to_string(),
    "std_msgs/msg/String".to_string(),
    None,   // message definition (auto-resolved if None)
    None,   // type description hash
    None,   // serialization format (defaults to "cdr")
    None,   // QoS profiles
)?;

// Write raw CDR-serialized bytes with a nanosecond timestamp
let timestamp_ns: u64 = 1_700_000_000_000_000_000;
writer.write(&conn, timestamp_ns, b"\x00\x01\x00\x00\x06\x00\x00\x00hello\x00")?;

writer.close()?;

Write to MCAP format with zstd compression

use apex_io::rosbag::{Writer, StoragePlugin, CompressionMode, CompressionFormat};

let mut writer = Writer::new("output_bag", None, Some(StoragePlugin::Mcap))?;
writer.open()?;

// Enable file-level zstd compression
writer.set_compression(CompressionMode::File, CompressionFormat::Zstd)?;

let conn = writer.add_connection(
    "/lidar/points".to_string(),
    "sensor_msgs/msg/PointCloud2".to_string(),
    None, None, None, None,
)?;

for (i, payload) in payloads.iter().enumerate() {
    let ts = 1_700_000_000_000_000_000u64 + i as u64 * 100_000_000; // 100 ms apart
    writer.write(&conn, ts, payload)?;
}

writer.close()?;

Write multiple topics

use apex_io::rosbag::{Writer, StoragePlugin};

let mut writer = Writer::new("multi_topic_bag", None, Some(StoragePlugin::Sqlite3))?;
writer.open()?;

let imu_conn = writer.add_connection(
    "/imu/data".to_string(),
    "sensor_msgs/msg/Imu".to_string(),
    None, None, None, None,
)?;

let odom_conn = writer.add_connection(
    "/odom".to_string(),
    "nav_msgs/msg/Odometry".to_string(),
    None, None, None, None,
)?;

// Interleave messages from different topics
writer.write(&imu_conn,  1_000_000_000, &imu_bytes)?;
writer.write(&odom_conn, 1_050_000_000, &odom_bytes)?;
writer.write(&imu_conn,  1_100_000_000, &imu_bytes2)?;

writer.close()?;

Live DDS Subscription

Enable the dds feature and have a running ROS2 node on the same DDS domain.

apex-io = { version = "0.2.0", features = ["dds"] }

Subscribe to a single topic

use apex_io::dds::{DdsSubscriber, DdsSubscriberConfig};
use apex_io::rosbag::{QosReliability, QosDurability};

let config = DdsSubscriberConfig {
    topic: "/imu/data".to_string(),
    message_type: "sensor_msgs/msg/Imu".to_string(),
    reliability: QosReliability::Reliable,
    durability: QosDurability::Volatile,
    history_depth: 10,
    domain_id: 0,
    channel_capacity: 128,
};

let subscriber = DdsSubscriber::new(config)?;

// `listen` spawns a background thread and returns a channel receiver
let rx = subscriber.listen()?;

// Process messages as they arrive
for raw_msg in rx {
    println!("Received {} bytes on {} @ {} ns",
        raw_msg.raw_data.len(),
        raw_msg.connection.topic,
        raw_msg.timestamp);
}

Subscribe to multiple topics

Use a thread per topic and collect via a shared channel, or use the dds_multi_listener binary:

use apex_io::dds::{DdsSubscriber, DdsSubscriberConfig};
use std::sync::mpsc;
use std::thread;

let topics = vec![
    ("/imu/data", "sensor_msgs/msg/Imu"),
    ("/odom",     "nav_msgs/msg/Odometry"),
    ("/tf",       "tf2_msgs/msg/TFMessage"),
];

let (tx, rx) = mpsc::channel();

for (topic, msg_type) in topics {
    let tx = tx.clone();
    let config = DdsSubscriberConfig {
        topic: topic.to_string(),
        message_type: msg_type.to_string(),
        reliability: Default::default(),
        durability: Default::default(),
        history_depth: 10,
        domain_id: 0,
        channel_capacity: 64,
    };
    thread::spawn(move || {
        let sub = DdsSubscriber::new(config).unwrap();
        let sub_rx = sub.listen().unwrap();
        for msg in sub_rx {
            tx.send(msg).unwrap();
        }
    });
}

// Unified receive loop across all topics
for msg in rx {
    println!("{} @ {} ns", msg.connection.topic, msg.timestamp);
}

Topic name conversion

ROS2 topic names are automatically converted to DDS topic names:

use apex_io::dds::DdsSubscriber;

// "/imu/data"  →  "rt/imu/data"
let dds_topic = DdsSubscriber::ros2_to_dds_topic("/imu/data");

Pose Graph Formats

G2O Format

The G2O (General Graph Optimization) format supports both 2D (SE2) and 3D (SE3) pose graphs.

File structure:

VERTEX_SE2 id x y theta
VERTEX_SE3:QUAT id x y z qx qy qz qw
EDGE_SE2 from to dx dy dtheta info_xx info_xy info_yy info_xt info_yt info_tt
EDGE_SE3:QUAT from to dx dy dz dqx dqy dqz dqw [21 info matrix values]
use apex_io::{G2oLoader, GraphLoader};

// Load
let graph = G2oLoader::load("data/sphere2500.g2o")?;
println!("{} vertices, {} edges", graph.vertex_count(), graph.edge_count());

// Iterate SE3 vertices
for (id, v) in &graph.vertices_se3 {
    println!("Vertex {}: ({:.2}, {:.2}, {:.2})", id, v.x(), v.y(), v.z());
}

// Iterate SE3 edges
for edge in &graph.edges_se3 {
    println!("Edge {}{}", edge.from, edge.to);
}

// Write back
G2oLoader::write(&graph, "output.g2o")?;

TORO Format

TORO supports SE2 (2D) graphs only. Writing SE3 data returns an error.

File structure:

VERTEX2 id x y theta
EDGE2 from to dx dy dtheta info_xx info_xy info_yy info_xt info_yt info_tt
use apex_io::{ToroLoader, GraphLoader};

let graph = ToroLoader::load("data/intel.graph")?;
for (id, v) in &graph.vertices_se2 {
    println!("Vertex {}: ({:.2}, {:.2}, {:.2} rad)", id, v.x(), v.y(), v.theta());
}
ToroLoader::write(&graph, "output.graph")?;

Auto-detect format

use apex_io::load_graph;

let graph = load_graph("data/M3500.g2o")?;   // .g2o  → G2oLoader
let graph = load_graph("data/intel.graph")?; // .graph → ToroLoader

BAL Format (Bundle Adjustment)

File structure:

num_cameras num_points num_observations
camera_idx point_idx pixel_x pixel_y  # one observation per line
...
# then 9 camera parameters per camera (rotation x/y/z, translation x/y/z, focal, k1, k2)
# then 3 point coordinates per point (x, y, z)
use apex_io::BalLoader;

let dataset = BalLoader::load("problem-1778-993923-pre.txt")?;
println!("{} cameras, {} points, {} observations",
    dataset.cameras.len(), dataset.points.len(), dataset.observations.len());

for (i, cam) in dataset.cameras.iter().enumerate() {
    println!("Camera {}: f={:.1} k1={:.4} k2={:.4}", i, cam.focal_length, cam.k1, cam.k2);
}
for obs in &dataset.observations {
    println!("Camera {} sees point {} at ({:.1}, {:.1})",
        obs.camera_index, obs.point_index, obs.x, obs.y);
}

CLI Binaries

All binaries ship with the crate. Run them via:

cargo run -p apex-io --bin <name> -- <args>

bag_info — inspect bag metadata (fast, no storage file opened)

cargo run -p apex-io --bin bag_info -- path/to/my_bag

Reads only metadata.yaml. Output: version, storage format, compression, file sizes, duration, start/end timestamps, message count, and per-topic breakdown.

bag_filter — copy and filter a bag

# Filter by topic
cargo run -p apex-io --bin bag_filter -- input_bag output_bag \
    --topics /camera/image_raw,/imu/data

# Filter by time range (nanoseconds since epoch)
cargo run -p apex-io --bin bag_filter -- input_bag output_bag \
    --start 1700000000000000000 \
    --end   1700000010000000000

# Convert SQLite3 → MCAP
cargo run -p apex-io --bin bag_filter -- input_bag output_bag \
    --storage mcap

# All options combined with compression
cargo run -p apex-io --bin bag_filter -- input_bag output_bag \
    --topics /lidar/points \
    --start 1700000000000000000 \
    --end   1700000060000000000 \
    --storage mcap \
    --compression-mode file \
    --compression-format zstd

extract_topic_data — export to CSV or PNG

# Extract any topic to CSV (one row per message)
cargo run -p apex-io --bin extract_topic_data -- \
    path/to/my_bag /imu/data output_folder/

# Extract image topic to PNG files (one file per frame)
cargo run -p apex-io --bin extract_topic_data -- \
    path/to/my_bag /camera/image_raw output_folder/

write_dummy_bag — create a demo bag

# Write to ./demo_bag with 29+ message types
cargo run -p apex-io --bin write_dummy_bag

# Custom output path
cargo run -p apex-io --bin write_dummy_bag -- /tmp/my_demo_bag

download_datasets — download public SLAM datasets

cargo run -p apex-io --bin download_datasets

Downloads G2O, TORO, and BAL benchmark datasets configured in datasets.toml.

dds_multi_listener — live multi-topic DDS listener (dds feature required)

cargo run -p apex-io --features dds --bin dds_multi_listener -- \
    --topics /imu/data,/odom,/tf \
    --domain-id 0

Connects to a running ROS2 system and prints incoming messages. Requires a compatible DDS runtime (CycloneDDS, FastDDS) and active ROS2 nodes on the same domain.


Data Structures

Graph

pub struct Graph {
    pub vertices_se2: HashMap<usize, VertexSE2>,
    pub vertices_se3: HashMap<usize, VertexSE3>,
    pub edges_se2:    Vec<EdgeSE2>,
    pub edges_se3:    Vec<EdgeSE3>,
}
impl Graph {
    pub fn vertex_count(&self) -> usize;
    pub fn edge_count(&self) -> usize;
}

ROS2 bag core types

pub struct Connection {
    pub id:                   u32,
    pub topic:                String,        // "/imu/data"
    pub message_type:         String,        // "sensor_msgs/msg/Imu"
    pub serialization_format: String,        // "cdr"
    pub offered_qos_profiles: Vec<QosProfile>,
    // ...
}

pub struct Message {
    pub connection: Connection,
    pub topic:      String,
    pub timestamp:  u64,      // nanoseconds since epoch
    pub data:       Vec<u8>,  // CDR-serialized bytes
}

pub struct RawMessage {
    pub connection: Connection,
    pub timestamp:  u64,
    pub raw_data:   Vec<u8>,
}

pub enum StoragePlugin     { Sqlite3, Mcap }
pub enum CompressionMode   { None, Message, File, Storage }
pub enum CompressionFormat { None, Zstd }

QoS Profile

pub struct QosProfile {
    pub history:                     QosHistory,      // KeepLast | KeepAll | SystemDefault
    pub depth:                       u32,
    pub reliability:                 QosReliability,  // Reliable | BestEffort | SystemDefault
    pub durability:                  QosDurability,   // TransientLocal | Volatile | SystemDefault
    pub deadline:                    QosTime,
    pub lifespan:                    QosTime,
    pub liveliness:                  QosLiveliness,
    pub liveliness_lease_duration:   QosTime,
    pub avoid_ros_namespace_conventions: bool,
}

Performance

Technique Applied to
Memory-mapped file I/O (memmap2) All loaders — avoids loading full file into heap
Parallel parsing (rayon) Files > 1 000 lines — multi-core acceleration
Raw message API Bag reading without CDR deserialization overhead
Batch read read_raw_messages_batch — single allocation, no iterator overhead
Metadata-only path read_bag_metadata_fast — zero storage file I/O
Pre-allocated collections Estimated capacity from file size to minimize reallocs

Error Handling

Pose graph — IoError

Variant Description
Io(io::Error) Underlying I/O error
Parse { line, message } Parse error with line number
UnsupportedVertexType(String) Unknown vertex type in file
UnsupportedEdgeType(String) Unknown edge type in file
InvalidNumber { line, value } Failed to parse a number
MissingFields { line } Insufficient fields on a line
DuplicateVertex { id } Vertex ID collision
InvalidQuaternion { line, norm } Non-unit quaternion
UnsupportedFormat(String) Unrecognized file extension
FileCreationFailed { path, reason } Output file creation failed

ROS2 bag — BagError

BagError covers metadata parsing, storage backend (SQLite3/MCAP), CDR deserialization, compression, and connection mismatch failures. All variants include enough context (file path, topic, byte offset) for actionable diagnostics.


Visualization Feature

apex-io = { version = "0.2.0", features = ["visualization"] }
// SE2 vertex → Rerun types
let pos_2d: [f32; 2] = vertex_se2.to_rerun_position_2d(scale);
let pos_3d: Vec3     = vertex_se2.to_rerun_position_3d(scale, height);

// SE3 vertex → Rerun types
let (position, rotation): (Vec3, Quat) = vertex_se3.to_rerun_transform(scale);

Dependencies

Crate Purpose
apex-manifolds Lie group types (SE2, SE3)
nalgebra Linear algebra (vectors, matrices)
memmap2 Memory-mapped file I/O
rayon Parallel parsing
thiserror Error type derivation
tracing Structured logging
serde, serde_json Serialization
serde_yaml YAML metadata parsing (ROS2 bags)
chrono Timestamps in file headers
byteorder CDR byte-order handling
bytes Efficient byte buffers
hex Hex encoding for diagnostics
image PNG export for image topics
rusqlite SQLite3 storage backend
zstd Compression support
mcap MCAP storage backend
clap CLI argument parsing
ureq Dataset download
rerun Visualization (optional — visualization feature)
rustdds, tokio, futures Live DDS subscription (optional — dds feature)

References

License

Apache-2.0