mt_wind 0.7.0

Proxy nodes to publish Bagfile data from Minot to other publishing services.
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Mutex;

use mt_bagread::qos::{
    RmwQosDurabilityPolicy, RmwQosHistoryPolicy, RmwQosLivelinessPolicy, RmwQosReliabilityPolicy,
};
use r2r::{
    WrappedTypesupport,
    builtin_interfaces::msg::Time,
    geometry_msgs::msg::{
        Point, Pose, PoseWithCovariance, Quaternion, Twist, TwistWithCovariance, Vector3,
    },
    qos::{DurabilityPolicy, HistoryPolicy, LivelinessPolicy, ReliabilityPolicy},
    sensor_msgs::msg::PointField,
    std_msgs::msg::Header,
};

use anyhow::{Context, anyhow};
use log::{debug, error, info, warn};
use mt_sea::{Ship, ShipKind};
use tokio::sync::mpsc::UnboundedReceiver;

use r2r::nav_msgs::msg::Odometry;
use r2r::sensor_msgs::msg::{Imu, PointCloud2};

pub async fn wind(name: &str) -> anyhow::Result<UnboundedReceiver<Vec<mt_sea::WindData>>> {
    let kind = ShipKind::Wind(name.to_string());
    let ship =
        mt_sea::ship::NetworkShipImpl::init(kind.clone(), false, mt_sea::Qos::Reliable).await?;
    info!("Wind initialized with ship {:?}", kind);

    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

    tokio::spawn(async move {
        loop {
            match ship.wait_for_wind().await {
                Ok(wind_data) => {
                    tx.send(wind_data).unwrap();
                }
                Err(e) => {
                    error!("{}", e);
                    continue;
                }
            }
        }
    });

    Ok(rx)
}

#[derive(Copy, Clone, Debug, Default)]
pub enum Qos {
    Sensor,
    #[default]
    SystemDefault,
}

impl TryFrom<String> for Qos {
    type Error = anyhow::Error;

    fn try_from(value: String) -> Result<Self, Self::Error> {
        match value.as_str() {
            "sensor" => Ok(Qos::Sensor),
            "system_default" => Ok(Qos::SystemDefault),
            _ => Err(anyhow!("Invalid QoS value")),
        }
    }
}

impl From<Qos> for r2r::QosProfile {
    fn from(value: Qos) -> Self {
        match value {
            Qos::Sensor => r2r::QosProfile::sensor_data(),
            Qos::SystemDefault => r2r::QosProfile::system_default(),
        }
    }
}

#[derive(Clone, Debug)]
pub struct QosR2RMap(r2r::QosProfile);

impl TryFrom<mt_net::Qos> for QosR2RMap {
    type Error = anyhow::Error;
    fn try_from(value: mt_net::Qos) -> Result<Self, Self::Error> {
        Ok(match value {
            mt_net::Qos::Sensor => Self(r2r::QosProfile::sensor_data()),
            mt_net::Qos::SystemDefault => Self(r2r::QosProfile::system_default()),
            mt_net::Qos::Custom(q) => {
                let deadline = std::time::Duration::from_secs(q.deadline.sec)
                    + std::time::Duration::from_nanos(q.deadline.nsec);
                let lifespan = std::time::Duration::from_secs(q.lifespan.sec)
                    + std::time::Duration::from_nanos(q.lifespan.nsec);

                let reliability = match RmwQosReliabilityPolicy::from_str(&q.reliability)? {
                    RmwQosReliabilityPolicy::SystemDefault => ReliabilityPolicy::SystemDefault,
                    RmwQosReliabilityPolicy::Reliable => ReliabilityPolicy::Reliable,
                    RmwQosReliabilityPolicy::BestEffort => ReliabilityPolicy::BestEffort,
                    RmwQosReliabilityPolicy::Unknown => ReliabilityPolicy::Unknown,
                    #[cfg(not(feature = "humble"))]
                    RmwQosReliabilityPolicy::BestAvailable => ReliabilityPolicy::BestAvailable,
                };

                let durability = match RmwQosDurabilityPolicy::from_str(&q.durability)? {
                    RmwQosDurabilityPolicy::SystemDefault => DurabilityPolicy::SystemDefault,
                    RmwQosDurabilityPolicy::TransientLocal => DurabilityPolicy::TransientLocal,
                    RmwQosDurabilityPolicy::Volatile => DurabilityPolicy::Volatile,
                    RmwQosDurabilityPolicy::Unknown => DurabilityPolicy::Unknown,
                    #[cfg(not(feature = "humble"))]
                    RmwQosDurabilityPolicy::BestAvailable => DurabilityPolicy::BestAvailable,
                };

                let history = match RmwQosHistoryPolicy::from_str(&q.history)? {
                    RmwQosHistoryPolicy::SystemDefault => HistoryPolicy::SystemDefault,
                    RmwQosHistoryPolicy::KeepLast => HistoryPolicy::KeepLast,
                    RmwQosHistoryPolicy::KeepAll => HistoryPolicy::KeepAll,
                    RmwQosHistoryPolicy::Unknown => HistoryPolicy::Unknown,
                };

                let depth = q.depth.max(0) as usize;

                let liveliness_dur =
                    std::time::Duration::from_secs(q.liveliness_lease_duration.sec)
                        + std::time::Duration::from_nanos(q.liveliness_lease_duration.nsec);
                let liveliness = match RmwQosLivelinessPolicy::from_str(&q.liveliness)? {
                    RmwQosLivelinessPolicy::SystemDefault => LivelinessPolicy::SystemDefault,
                    RmwQosLivelinessPolicy::Automatic => LivelinessPolicy::Automatic,
                    RmwQosLivelinessPolicy::ManualByNode => LivelinessPolicy::ManualByNode,
                    RmwQosLivelinessPolicy::ManualByTopic => LivelinessPolicy::ManualByTopic,
                    RmwQosLivelinessPolicy::Unknown => LivelinessPolicy::Unknown,
                    #[cfg(not(feature = "humble"))]
                    RmwQosLivelinessPolicy::BestAvailable => LivelinessPolicy::BestAvailable,
                };

                // let lease_dur = q.liveliness_lease_duration
                Self(r2r::QosProfile {
                    history,
                    depth,
                    reliability,
                    durability,
                    deadline,
                    lifespan,
                    liveliness,
                    liveliness_lease_duration: liveliness_dur,
                    avoid_ros_namespace_conventions: false,
                })
            }
        })
    }
}
pub async fn run_dyn_wind(
    wind_name: &str,
    ready: tokio::sync::oneshot::Sender<()>,
) -> anyhow::Result<()> {
    let node_name = wind_name.to_owned() + "_node";
    let ctx = r2r::Context::create()?;
    let node = r2r::Node::create(ctx, &node_name, "")?;
    let node = std::sync::Arc::new(Mutex::new(node));

    let mut publishers: HashMap<String, r2r::PublisherUntyped> = HashMap::new();

    let mut wind_receiver = wind(wind_name).await?;

    if let Err(_) = ready.send(()) {
        log::warn!("ros2_r2r wind could not signal to be ready to handle requests");
    }

    while let Some(wind_data) = wind_receiver.recv().await {
        for data in wind_data {
            let qos = match data.qos {
                Some(qos) => QosR2RMap::try_from(qos)?.0,
                None => {
                    warn!("Received message without QOS, choosing system default");
                    // TODO debounce warning or only use info
                    Qos::default().into()
                }
            };
            let mut existing_pubber = publishers.get(&data.topic);
            if existing_pubber.is_none() {
                let mut lock = node.lock().unwrap();
                let pubber = lock.create_publisher_untyped(&data.topic, &data.msg_type, qos)?;

                publishers.insert(data.topic.clone(), pubber);
                existing_pubber = Some(
                    publishers
                        .get(&data.topic)
                        .expect("Just inserted the line before"),
                );
            }
            let pubber = existing_pubber.expect("Should be inserted manually if not exists.");
            match data.data {
                mt_net::SensorTypeMapped::Lidar(l) => {
                    let native_t = PointCloud2 {
                        header: Header {
                            stamp: Time {
                                sec: l.header.stamp.sec,
                                nanosec: l.header.stamp.nanosec,
                            },
                            frame_id: l.header.frame_id,
                        },
                        height: l.height,
                        width: l.width,
                        fields: l
                            .fields
                            .into_iter()
                            .map(|f| PointField {
                                name: f.name,
                                offset: f.offset,
                                datatype: f.datatype,
                                count: f.count,
                            })
                            .collect(),
                        is_bigendian: l.is_bigendian,
                        point_step: l.point_step,
                        row_step: l.row_step,
                        data: l.data,
                        is_dense: l.is_dense,
                    };
                    let raw = native_t
                        .to_serialized_bytes()
                        .context("Error encoding CDR")?;

                    pubber.publish_raw(&raw)?;

                    debug!("published cloud");
                }
                mt_net::SensorTypeMapped::Imu(imu) => {
                    let native_t = Imu {
                        header: Header {
                            stamp: Time {
                                sec: imu.header.stamp.sec,
                                nanosec: imu.header.stamp.nanosec,
                            },
                            frame_id: imu.header.frame_id,
                        },
                        orientation: Quaternion {
                            x: imu.orientation.x,
                            y: imu.orientation.y,
                            z: imu.orientation.z,
                            w: imu.orientation.w,
                        },
                        orientation_covariance: imu.orientation_covariance.to_vec(),
                        angular_velocity: Vector3 {
                            x: imu.angular_velocity.x,
                            y: imu.angular_velocity.y,
                            z: imu.angular_velocity.z,
                        },
                        angular_velocity_covariance: imu.angular_velocity_covariance.to_vec(),
                        linear_acceleration: Vector3 {
                            x: imu.linear_acceleration.x,
                            y: imu.linear_acceleration.y,
                            z: imu.linear_acceleration.z,
                        },
                        linear_acceleration_covariance: imu.linear_acceleration_covariance.to_vec(),
                    };
                    let raw = native_t
                        .to_serialized_bytes()
                        .context("Error encoding CDR: {e}")?;

                    pubber.publish_raw(&raw)?;
                    debug!("published imu");
                }
                mt_net::SensorTypeMapped::Any(raw_data) => {
                    pubber.publish_raw(&raw_data)?;
                    debug!("published raw");
                }
                mt_net::SensorTypeMapped::Odometry(odometry) => {
                    let native_t = Odometry {
                        header: Header {
                            stamp: Time {
                                sec: odometry.header.stamp.sec,
                                nanosec: odometry.header.stamp.nanosec,
                            },
                            frame_id: odometry.header.frame_id,
                        },
                        child_frame_id: odometry.child_frame_id,
                        pose: PoseWithCovariance {
                            pose: Pose {
                                position: Point {
                                    x: odometry.pose.pose.position.x,
                                    y: odometry.pose.pose.position.y,
                                    z: odometry.pose.pose.position.z,
                                },
                                orientation: Quaternion {
                                    x: odometry.pose.pose.orientation.x,
                                    y: odometry.pose.pose.orientation.y,
                                    z: odometry.pose.pose.orientation.z,
                                    w: odometry.pose.pose.orientation.w,
                                },
                            },
                            covariance: odometry.pose.covariance.to_vec(),
                        },
                        twist: TwistWithCovariance {
                            twist: Twist {
                                linear: Vector3 {
                                    x: odometry.twist.twist.linear.x,
                                    y: odometry.twist.twist.linear.y,
                                    z: odometry.twist.twist.linear.z,
                                },
                                angular: Vector3 {
                                    x: odometry.twist.twist.angular.x,
                                    y: odometry.twist.twist.angular.y,
                                    z: odometry.twist.twist.angular.z,
                                },
                            },
                            covariance: odometry.twist.covariance.to_vec(),
                        },
                    };
                    let raw = native_t
                        .to_serialized_bytes()
                        .context("Error encoding CDR")?;

                    pubber.publish_raw(&raw)?;

                    debug!("published odom");
                }
            }
        }
    }

    Ok(())
}

pub fn get_env_or_default(key: &str, default: &str) -> anyhow::Result<String> {
    match std::env::var(key) {
        Ok(name) => Ok(name),
        Err(e) => match e {
            std::env::VarError::NotPresent => Ok(default.to_owned()),
            std::env::VarError::NotUnicode(_os_string) => Err(anyhow!(
                "Could not fetch env variable because it is not unicode"
            )),
        },
    }
}

#[tokio::main]
#[allow(dead_code)]
async fn main() -> anyhow::Result<()> {
    let env = env_logger::Env::new().filter_or("WIND_LOG", "info");
    env_logger::Builder::from_env(env).init();

    let wind_name = get_env_or_default("WIND_NAME", "turbine_ros2_c")?;

    let (tx, rx) = tokio::sync::oneshot::channel();
    tokio::spawn(async move {
        // dump ready answer
        _ = rx.await;
    });
    run_dyn_wind(&wind_name, tx).await?; // will never return

    Ok(())
}