transforms_io 0.1.0

IO layer for the transforms crate with ROS2 and Minot.
Documentation
//! r2r backend implementation.
//!
//! Two modes available:
//! - `r2r` feature: Uses futures-lite with blocking threads (default)
//! - `r2r-async` feature: Uses futures for custom async runtimes

use std::sync::{Arc, RwLock};
use std::time::Duration;

use nalgebra::geometry::{Isometry3, Translation3, UnitQuaternion};
use transforms::geometry::{Quaternion, Transform, Vector3};
use transforms::time::Timestamp;
use transforms::Registry;

use crate::error::TfError;

/// TF Broadcaster for r2r.
pub struct TfBroadcaster {
    publisher: r2r::Publisher<r2r::tf2_msgs::msg::TFMessage>,
    static_publisher: r2r::Publisher<r2r::tf2_msgs::msg::TFMessage>,
}

impl TfBroadcaster {
    /// Create a new TF broadcaster.
    #[track_caller]
    pub fn new(node: &mut r2r::Node) -> Self {
        let publisher = node
            .create_publisher("/tf", r2r::QosProfile::default())
            .expect("Failed to create /tf publisher");

        let static_publisher = node
            .create_publisher(
                "/tf_static",
                r2r::QosProfile::parameters()
                    .durability(r2r::qos::DurabilityPolicy::TransientLocal)
                    .reliability(r2r::qos::ReliabilityPolicy::Reliable)
                    .history(r2r::qos::HistoryPolicy::KeepLast)
                    .keep_last(1),
            )
            .expect("Failed to create /tf_static publisher");

        Self {
            publisher,
            static_publisher,
        }
    }

    /// Broadcast a transform.
    pub fn send_transform(
        &self,
        transform: r2r::geometry_msgs::msg::TransformStamped,
    ) -> Result<(), TfError> {
        let msg = r2r::tf2_msgs::msg::TFMessage {
            transforms: vec![transform],
        };
        self.publisher
            .publish(&msg)
            .map_err(|e| TfError::Publish(e.to_string()))
    }

    /// Broadcast a static transform.
    pub fn send_static_transform(
        &self,
        transform: r2r::geometry_msgs::msg::TransformStamped,
    ) -> Result<(), TfError> {
        let msg = r2r::tf2_msgs::msg::TFMessage {
            transforms: vec![transform],
        };
        self.static_publisher
            .publish(&msg)
            .map_err(|e| TfError::Publish(e.to_string()))
    }
}

/// TF Listener for r2r.
pub struct TfListener {
    registry: Arc<RwLock<Registry<Timestamp>>>,
}

/// Default buffer duration in seconds.
pub const DEFAULT_BUFFER_DURATION_SECS: u64 = 10;

impl TfListener {
    /// Look up a transform at a specific time.
    pub fn lookup_transform(
        &self,
        source_frame: &str,
        target_frame: &str,
        time: r2r::builtin_interfaces::msg::Time,
    ) -> Result<r2r::geometry_msgs::msg::TransformStamped, TfError> {
        let timestamp = ros_time_to_timestamp(&time);

        let reg = self.registry.read().map_err(|_| TfError::Lock)?;

        // Handle zero time (get latest)
        let ts = if time.sec == 0 && time.nanosec == 0 {
            Timestamp::now()
        } else {
            timestamp
        };

        let transform = reg
            .get_transform(source_frame, target_frame, ts)
            .map_err(|e| TfError::Lookup(e.to_string()))?;

        Ok(transform_to_ros(&transform, time))
    }
}

// ============================================================================
// Blocking implementation (futures-lite with threads)
// ============================================================================

#[cfg(feature = "r2r")]
mod blocking {
    use super::*;
    use std::thread;

    impl TfListener {
        /// Create a new TF listener with default buffer duration (10 seconds).
        ///
        /// Uses blocking threads with futures-lite runtime.
        #[track_caller]
        pub fn new(node: &mut r2r::Node) -> Self {
            Self::with_buffer_duration(node, Duration::from_secs(DEFAULT_BUFFER_DURATION_SECS))
        }

        /// Create a new TF listener with custom buffer duration.
        ///
        /// Uses blocking threads with futures-lite runtime.
        #[track_caller]
        pub fn with_buffer_duration(node: &mut r2r::Node, buffer_duration: Duration) -> Self {
            use futures_lite::StreamExt;

            let registry = Arc::new(RwLock::new(Registry::new(buffer_duration)));

            // Subscribe to /tf
            let mut dynamic_sub = node
                .subscribe::<r2r::tf2_msgs::msg::TFMessage>("/tf", r2r::QosProfile::default())
                .expect("Failed to subscribe to /tf");

            let registry_dyn = registry.clone();
            thread::spawn(move || {
                futures_lite::future::block_on(async {
                    while Arc::strong_count(&registry_dyn) > 1 {
                        while let Some(msg) = dynamic_sub.next().await {
                            if let Ok(mut reg) = registry_dyn.write() {
                                for tf in msg.transforms {
                                    let transform = ros_to_transform(&tf);
                                    reg.add_transform(transform);
                                }
                            }
                        }
                    }
                });
            });

            // Subscribe to /tf_static
            let mut static_sub = node
                .subscribe::<r2r::tf2_msgs::msg::TFMessage>(
                    "/tf_static",
                    r2r::QosProfile::parameters()
                        .durability(r2r::qos::DurabilityPolicy::TransientLocal)
                        .reliability(r2r::qos::ReliabilityPolicy::Reliable)
                        .history(r2r::qos::HistoryPolicy::KeepLast)
                        .keep_last(1),
                )
                .expect("Failed to subscribe to /tf_static");

            let registry_static = registry.clone();
            thread::spawn(move || {
                futures_lite::future::block_on(async {
                    while Arc::strong_count(&registry_static) > 1 {
                        while let Some(msg) = static_sub.next().await {
                            if let Ok(mut reg) = registry_static.write() {
                                for tf in msg.transforms {
                                    let transform = ros_to_transform(&tf);
                                    reg.add_transform(transform);
                                }
                            }
                        }
                    }
                });
            });

            Self { registry }
        }
    }
}

// ============================================================================
// Async implementation (generic futures)
// ============================================================================

#[cfg(feature = "r2r-async")]
mod async_impl {
    use super::*;
    use futures::StreamExt;

    impl TfListener {
        /// Create a new TF listener with default buffer duration (10 seconds).
        ///
        /// Returns a handle and spawn handles for the subscriber tasks.
        /// The caller is responsible for spawning these on their async runtime.
        #[track_caller]
        pub fn new_async(
            node: &mut r2r::Node,
        ) -> (
            Self,
            impl futures::Future<Output = ()>,
            impl futures::Future<Output = ()>,
        ) {
            Self::with_buffer_duration_async(
                node,
                Duration::from_secs(DEFAULT_BUFFER_DURATION_SECS),
            )
        }

        /// Create a new TF listener with custom buffer duration.
        ///
        /// Returns the listener and two futures that must be spawned on your runtime.
        #[track_caller]
        pub fn with_buffer_duration_async(
            node: &mut r2r::Node,
            buffer_duration: Duration,
        ) -> (
            Self,
            impl futures::Future<Output = ()>,
            impl futures::Future<Output = ()>,
        ) {
            let registry = Arc::new(RwLock::new(Registry::new(buffer_duration)));

            // Subscribe to /tf
            let mut dynamic_sub = node
                .subscribe::<r2r::tf2_msgs::msg::TFMessage>("/tf", r2r::QosProfile::default())
                .expect("Failed to subscribe to /tf");

            let registry_dyn = registry.clone();
            let dynamic_future = async move {
                while Arc::strong_count(&registry_dyn) > 1 {
                    while let Some(msg) = dynamic_sub.next().await {
                        if let Ok(mut reg) = registry_dyn.write() {
                            for tf in msg.transforms {
                                let transform = ros_to_transform(&tf);
                                reg.add_transform(transform);
                            }
                        }
                    }
                }
            };

            // Subscribe to /tf_static
            let mut static_sub = node
                .subscribe::<r2r::tf2_msgs::msg::TFMessage>(
                    "/tf_static",
                    r2r::QosProfile::parameters()
                        .durability(r2r::qos::DurabilityPolicy::TransientLocal)
                        .reliability(r2r::qos::ReliabilityPolicy::Reliable)
                        .history(r2r::qos::HistoryPolicy::KeepLast)
                        .keep_last(1),
                )
                .expect("Failed to subscribe to /tf_static");

            let registry_static = registry.clone();
            let static_future = async move {
                while Arc::strong_count(&registry_static) > 1 {
                    while let Some(msg) = static_sub.next().await {
                        if let Ok(mut reg) = registry_static.write() {
                            for tf in msg.transforms {
                                let transform = ros_to_transform(&tf);
                                reg.add_transform(transform);
                            }
                        }
                    }
                }
            };

            (Self { registry }, dynamic_future, static_future)
        }
    }
}

/// Convert ROS TransformStamped to transforms::Transform.
fn ros_to_transform(tf: &r2r::geometry_msgs::msg::TransformStamped) -> Transform {
    Transform {
        translation: Vector3::new(
            tf.transform.translation.x,
            tf.transform.translation.y,
            tf.transform.translation.z,
        ),
        rotation: Quaternion {
            w: tf.transform.rotation.w,
            x: tf.transform.rotation.x,
            y: tf.transform.rotation.y,
            z: tf.transform.rotation.z,
        },
        timestamp: ros_time_to_timestamp(&tf.header.stamp),
        parent: tf.header.frame_id.clone(),
        child: tf.child_frame_id.clone(),
    }
}

/// Convert transforms::Transform to ROS TransformStamped.
fn transform_to_ros(
    tf: &Transform,
    time: r2r::builtin_interfaces::msg::Time,
) -> r2r::geometry_msgs::msg::TransformStamped {
    r2r::geometry_msgs::msg::TransformStamped {
        header: r2r::std_msgs::msg::Header {
            stamp: time,
            frame_id: tf.parent.clone(),
        },
        child_frame_id: tf.child.clone(),
        transform: r2r::geometry_msgs::msg::Transform {
            translation: r2r::geometry_msgs::msg::Vector3 {
                x: tf.translation.x,
                y: tf.translation.y,
                z: tf.translation.z,
            },
            rotation: r2r::geometry_msgs::msg::Quaternion {
                x: tf.rotation.x,
                y: tf.rotation.y,
                z: tf.rotation.z,
                w: tf.rotation.w,
            },
        },
    }
}

/// Convert ROS Time to Timestamp.
fn ros_time_to_timestamp(time: &r2r::builtin_interfaces::msg::Time) -> Timestamp {
    let nanos = (time.sec as u128) * 1_000_000_000 + (time.nanosec as u128);
    Timestamp { t: nanos }
}

/// Convert a ROS Transform to an Isometry3.
pub fn isometry_from_transform(tf: &r2r::geometry_msgs::msg::Transform) -> Isometry3<f64> {
    let trans = Translation3::new(tf.translation.x, tf.translation.y, tf.translation.z);
    let rot = UnitQuaternion::new_normalize(nalgebra::geometry::Quaternion::new(
        tf.rotation.w,
        tf.rotation.x,
        tf.rotation.y,
        tf.rotation.z,
    ));
    Isometry3::from_parts(trans, rot)
}