px4sim 0.1.6

A wrapper to simplify creating a custom PX4 SITL simulator.
Documentation
use log::warn;
use mavlink::{
    common::{MavCmd, MavMessage, HIL_ACTUATOR_CONTROLS_DATA},
    connect, MavConnection,
};
use std::{
    any::Any,
    sync::{
        mpsc::{channel, Receiver, RecvError, RecvTimeoutError, SendError, Sender, TryRecvError},
        Arc,
    },
    thread::{spawn, JoinHandle},
    time::Duration,
};

use crate::data::DroneData;
use crate::messages::{
    make_header, make_heartbeat, make_hil_gps, make_hil_sensor, make_hil_state_quaternion,
    make_system_time,
};
use crate::util::start_px4;

pub type DroneLink = Arc<Box<dyn MavConnection<MavMessage> + Sync + Send>>;
pub type ActuatorData = HIL_ACTUATOR_CONTROLS_DATA;

pub struct DroneSimulator {
    data_sink: Sender<DroneData>,
    control_source: Receiver<ActuatorData>,
    worker: Option<JoinHandle<()>>,
}

impl DroneSimulator {
    pub fn send(&self, data: DroneData) -> Result<(), SendError<DroneData>> {
        self.data_sink.send(data)
    }

    pub fn recv(&self) -> Result<ActuatorData, RecvError> {
        self.control_source.recv()
    }

    pub fn recv_timeout(&self, timeout: Duration) -> Result<ActuatorData, RecvTimeoutError> {
        self.control_source.recv_timeout(timeout)
    }

    pub fn try_recv(&self) -> Result<ActuatorData, TryRecvError> {
        self.control_source.try_recv()
    }

    pub fn last(&self) -> Option<ActuatorData> {
        self.control_source.try_iter().last()
    }

    pub fn join(&mut self) -> Result<(), Box<dyn Any + Send>> {
        match self.worker.take() {
            Some(handle) => handle.join(),
            None => Ok(()),
        }
    }
}

fn validate_first_message(link: &DroneLink, system_id: u8) {
    let (header, msg) = link.recv().unwrap();
    assert_eq!(header.system_id, system_id);
    match msg {
        MavMessage::COMMAND_LONG(data) => {
            assert_eq!(data.command, MavCmd::MAV_CMD_SET_MESSAGE_INTERVAL);
            assert_eq!(data.param1, 115.); // HIL_STATE_QUATERNION message
            assert_eq!(data.param2, 5000.); // 5 ms interval
        }
        _ => panic!("Expected COMMAND_LONG, received: {msg:?}"),
    }
}

fn validate_second_message(link: &DroneLink, system_id: u8) {
    let (header, msg) = link.recv().unwrap();
    assert_eq!(header.system_id, system_id);
    match msg {
        MavMessage::HEARTBEAT(_data) => {
            // assert data here
        }
        _ => panic!("Expected HEARTBEAT, received: {msg:?}"),
    }
}

fn send_in_thread(
    link: DroneLink,
    rx: Receiver<DroneData>,
    system_id: u8,
    boot_time: u32,
) -> JoinHandle<()> {
    spawn({
        let send_message = { move |msg: &MavMessage| link.send(&make_header(system_id), msg) };
        move || loop {
            match rx.recv() {
                Ok(data) => {
                    if [
                        send_message(&make_system_time(boot_time)),
                        send_message(&make_heartbeat()),
                        send_message(&make_hil_gps(&data)),
                        send_message(&make_hil_state_quaternion(&data)),
                        send_message(&make_hil_sensor(&data)),
                    ]
                    .iter()
                    .any(|r| r.is_err())
                    {
                        warn!("Sender thread for system {system_id}: simulator link broken. Quitting.");
                        return;
                    }
                }
                Err(_) => {
                    warn!("Sender thread for system {system_id}: data channel broken. Quitting.");
                    return;
                }
            }
        }
    })
}

fn receive_in_thread(
    link: DroneLink,
    tx: Sender<HIL_ACTUATOR_CONTROLS_DATA>,
    system_id: u8,
) -> JoinHandle<()> {
    spawn(move || loop {
        match link.recv() {
            Ok((header, msg)) => {
                assert_eq!(header.system_id, system_id);
                match msg {
                    MavMessage::HIL_ACTUATOR_CONTROLS(data) => {
                        if tx.send(data).is_err() {
                            warn!("Receiver thread for system {system_id}: data channel broken. Quitting.");
                            return;
                        }
                    }
                    _ => panic!("Expected HIL_ACTUATOR_CONTROLS, received: {msg:?}"),
                }
            }
            Err(_) => {
                warn!("Receiver thread for system {system_id}: simulator link broken. Quitting.");
                return;
            }
        }
    })
}

impl DroneSimulator {
    pub fn new(system_id: u8, boot_time: u32, px4_path_airframe: Option<(&str, &str)>) -> Self {
        let px4 = match px4_path_airframe {
            Some((path, airframe)) => Some(start_px4(path, system_id, airframe).unwrap()),
            None => None,
        };
        let link = {
            let protocol = "tcpin";
            let ip = "127.0.0.1";
            let port = 4560 + system_id as u32;
            let address = format!("{protocol}:{ip}:{port}");
            let address = &address[..];
            connect::<MavMessage>(address).unwrap()
        };
        let link = Arc::new(link);
        //
        validate_first_message(&link, system_id + 1);
        validate_second_message(&link, system_id + 1);
        //
        let (data_sink, rx) = channel();
        let sender = send_in_thread(link.clone(), rx, system_id + 1, boot_time);
        //
        let (tx, control_source) = channel();
        let receiver = receive_in_thread(link.clone(), tx, system_id + 1);
        //
        let worker = Some(spawn(move || {
            sender.join().unwrap();
            receiver.join().unwrap();
            if let Some(mut child) = px4 {
                child.kill().unwrap();
                child.wait().unwrap();
            }
        }));
        //
        DroneSimulator {
            data_sink,
            control_source,
            worker,
        }
    }
}