mt_wind 0.8.0

Proxy nodes to publish Bagfile data from Minot to other publishing services.
roslibrust::find_and_generate_ros_messages_without_ros_package_path!("msg/ros1");

use anyhow::anyhow;
use std::collections::HashMap;

use log::{debug, error, info};
use mt_sea::{Ship, ShipKind};
use roslibrust::{
    codegen::Time,
    ros1::{NodeHandle, RosMasterError},
};
use tokio::sync::mpsc::UnboundedReceiver;

pub async fn wind(name: &str) -> anyhow::Result<UnboundedReceiver<Vec<mt_sea::WindData>>> {
    let kind = ShipKind::Wind(name.to_string());
    let ship =
        mt_sea::ship::NetworkShipImpl::init(kind.clone(), false, mt_sea::Qos::Reliable).await?;
    info!("Wind initialized with ship {:?}", kind);

    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

    tokio::spawn(async move {
        loop {
            match ship.wait_for_wind().await {
                Ok(wind_data) => {
                    tx.send(wind_data).unwrap();
                }
                Err(e) => {
                    error!("{}", e);
                    continue;
                }
            }
        }
    });

    Ok(rx)
}

pub async fn run_dyn_wind(
    master_uri: &str,
    wind_name: &str,
    ready: tokio::sync::oneshot::Sender<()>,
) -> anyhow::Result<Option<()>> {
    let nh = NodeHandle::new(&master_uri, &wind_name).await;

    let nh = match nh {
        Ok(nh) => nh,
        Err(err) => match err {
            roslibrust::ros1::NodeError::RosMasterError(
                RosMasterError::ServerCommunicationFailure(_),
            )
            | roslibrust::ros1::NodeError::RosMasterError(
                RosMasterError::HostIpResolutionFailure(_),
            ) => return Ok(None), // reports that there was no connection to the ROS1 master, print as error outside and stop here
            _ => {
                return Err(err.into());
            }
        },
    };
    let mut cloud_publishers = HashMap::new();
    let mut imu_publishers = HashMap::new();
    let mut odom_publishers = HashMap::new();

    let mut wind_receiver = wind(&wind_name).await?;

    if let Err(_) = ready.send(()) {
        log::warn!("ros1 wind could not signal to be ready to handle requests");
    }

    while let Some(wind_data) = wind_receiver.recv().await {
        for data in wind_data {
            match data.data {
                mt_net::SensorTypeMapped::Lidar(cloud_msg) => {
                    let mut existing_pubber = cloud_publishers.get(&data.topic);
                    if existing_pubber.is_none() {
                        let pubber = nh
                            .advertise::<sensor_msgs::PointCloud2>(&data.topic, 10, false)
                            .await?;
                        cloud_publishers.insert(data.topic.clone(), pubber);
                        existing_pubber = Some(
                            cloud_publishers
                                .get(&data.topic)
                                .expect("Just inserted the line before"),
                        );
                    }
                    let pubber =
                        existing_pubber.expect("Should be inserted manually if not exists.");
                    let msg = sensor_msgs::PointCloud2 {
                        header: std_msgs::Header {
                            seq: 0,
                            stamp: Time {
                                secs: cloud_msg.header.stamp.sec,
                                nsecs: i32::try_from(cloud_msg.header.stamp.nanosec)?,
                            },
                            frame_id: cloud_msg.header.frame_id,
                        },
                        height: cloud_msg.height,
                        width: cloud_msg.width,
                        fields: cloud_msg
                            .fields
                            .into_iter()
                            .map(|pf| sensor_msgs::PointField {
                                name: pf.name,
                                offset: pf.offset,
                                datatype: pf.datatype,
                                count: pf.count,
                            })
                            .collect::<Vec<_>>(),
                        is_bigendian: cloud_msg.is_bigendian,
                        point_step: cloud_msg.point_step,
                        row_step: cloud_msg.row_step,
                        data: cloud_msg.data,
                        is_dense: cloud_msg.is_dense,
                    };
                    pubber.publish(&msg).await?;
                    debug!("published cloud");
                }
                mt_net::SensorTypeMapped::Imu(imu_msg) => {
                    let mut existing_pubber = imu_publishers.get(&data.topic);
                    if existing_pubber.is_none() {
                        let pubber = nh
                            .advertise::<sensor_msgs::Imu>(&data.topic, 10, false)
                            .await?;
                        imu_publishers.insert(data.topic.clone(), pubber);
                        existing_pubber = Some(
                            imu_publishers
                                .get(&data.topic)
                                .expect("Just inserted the line before"),
                        );
                    }
                    let pubber =
                        existing_pubber.expect("Should be inserted manually if not exists.");
                    let msg = sensor_msgs::Imu {
                        header: std_msgs::Header {
                            seq: 0,
                            stamp: Time {
                                secs: imu_msg.header.stamp.sec,
                                nsecs: i32::try_from(imu_msg.header.stamp.nanosec)?,
                            },
                            frame_id: imu_msg.header.frame_id,
                        },
                        orientation: {
                            geometry_msgs::Quaternion {
                                x: imu_msg.orientation.x,
                                y: imu_msg.orientation.y,
                                z: imu_msg.orientation.z,
                                w: imu_msg.orientation.w,
                            }
                        },
                        orientation_covariance: [
                            imu_msg.orientation_covariance[0],
                            imu_msg.orientation_covariance[1],
                            imu_msg.orientation_covariance[2],
                            imu_msg.orientation_covariance[3],
                            imu_msg.orientation_covariance[4],
                            imu_msg.orientation_covariance[5],
                            imu_msg.orientation_covariance[6],
                            imu_msg.orientation_covariance[7],
                            imu_msg.orientation_covariance[8],
                        ],
                        angular_velocity: geometry_msgs::Vector3 {
                            x: imu_msg.angular_velocity.x,
                            y: imu_msg.angular_velocity.y,
                            z: imu_msg.angular_velocity.z,
                        },
                        angular_velocity_covariance: [
                            imu_msg.angular_velocity_covariance[0],
                            imu_msg.angular_velocity_covariance[1],
                            imu_msg.angular_velocity_covariance[2],
                            imu_msg.angular_velocity_covariance[3],
                            imu_msg.angular_velocity_covariance[4],
                            imu_msg.angular_velocity_covariance[5],
                            imu_msg.angular_velocity_covariance[6],
                            imu_msg.angular_velocity_covariance[7],
                            imu_msg.angular_velocity_covariance[8],
                        ],
                        linear_acceleration: geometry_msgs::Vector3 {
                            x: imu_msg.linear_acceleration.x,
                            y: imu_msg.linear_acceleration.y,
                            z: imu_msg.linear_acceleration.z,
                        },
                        linear_acceleration_covariance: [
                            imu_msg.linear_acceleration_covariance[0],
                            imu_msg.linear_acceleration_covariance[1],
                            imu_msg.linear_acceleration_covariance[2],
                            imu_msg.linear_acceleration_covariance[3],
                            imu_msg.linear_acceleration_covariance[4],
                            imu_msg.linear_acceleration_covariance[5],
                            imu_msg.linear_acceleration_covariance[6],
                            imu_msg.linear_acceleration_covariance[7],
                            imu_msg.linear_acceleration_covariance[8],
                        ],
                    };
                    pubber.publish(&msg).await?;
                    debug!("published cloud");
                }
                mt_net::SensorTypeMapped::Any(_) => {
                    error!(
                        "Any-Types are not supported for ROS1 due to different message encodings."
                    )
                }
                mt_net::SensorTypeMapped::Odometry(odom_msg) => {
                    let mut existing_pubber = odom_publishers.get(&data.topic);
                    if existing_pubber.is_none() {
                        let pubber = nh
                            .advertise::<nav_msgs::Odometry>(&data.topic, 10, false)
                            .await?;
                        odom_publishers.insert(data.topic.clone(), pubber);
                        existing_pubber = Some(
                            odom_publishers
                                .get(&data.topic)
                                .expect("Just inserted the line before"),
                        );
                    }
                    let pubber =
                        existing_pubber.expect("Should be inserted manually if not exists.");
                    let msg = nav_msgs::Odometry {
                        header: std_msgs::Header {
                            seq: 0,
                            stamp: Time {
                                secs: odom_msg.header.stamp.sec,
                                nsecs: i32::try_from(odom_msg.header.stamp.nanosec)?,
                            },
                            frame_id: odom_msg.header.frame_id,
                        },
                        child_frame_id: odom_msg.child_frame_id,
                        pose: geometry_msgs::PoseWithCovariance {
                            pose: geometry_msgs::Pose {
                                position: geometry_msgs::Point {
                                    x: odom_msg.pose.pose.position.x,
                                    y: odom_msg.pose.pose.position.y,
                                    z: odom_msg.pose.pose.position.z,
                                },
                                orientation: geometry_msgs::Quaternion {
                                    x: odom_msg.pose.pose.orientation.x,
                                    y: odom_msg.pose.pose.orientation.y,
                                    z: odom_msg.pose.pose.orientation.z,
                                    w: odom_msg.pose.pose.orientation.w,
                                },
                            },
                            covariance: odom_msg.pose.covariance,
                        },
                        twist: geometry_msgs::TwistWithCovariance {
                            twist: geometry_msgs::Twist {
                                linear: geometry_msgs::Vector3 {
                                    x: odom_msg.twist.twist.linear.x,
                                    y: odom_msg.twist.twist.linear.y,
                                    z: odom_msg.twist.twist.linear.z,
                                },
                                angular: geometry_msgs::Vector3 {
                                    x: odom_msg.twist.twist.angular.x,
                                    y: odom_msg.twist.twist.angular.y,
                                    z: odom_msg.twist.twist.angular.z,
                                },
                            },
                            covariance: odom_msg.twist.covariance,
                        },
                    };

                    pubber.publish(&msg).await?;
                    debug!("published odom");
                }
            }
        }
    }

    Ok(Some(()))
}

// TODO share in module so both submodules can uzse it
pub fn get_env_or_default(key: &str, default: &str) -> anyhow::Result<String> {
    match std::env::var(key) {
        Ok(name) => Ok(name),
        Err(e) => match e {
            std::env::VarError::NotPresent => Ok(default.to_owned()),
            std::env::VarError::NotUnicode(_os_string) => Err(anyhow!(
                "Could not fetch env variable because it is not unicode"
            )),
        },
    }
}

#[tokio::main]
#[allow(dead_code)]
async fn main() -> anyhow::Result<()> {
    mt_sea::init_logging();

    let wind_name = get_env_or_default("wind_name", "turbine_ros1")?;
    let master_uri = get_env_or_default("ROS_MASTER_URI", "http://localhost:11311")?;

    let (tx, rx) = tokio::sync::oneshot::channel();
    tokio::spawn(async move {
        // dump ready answer
        _ = rx.await;
    });
    run_dyn_wind(&master_uri, &wind_name, tx).await?;

    Ok(())
}