nodo 0.18.5

A realtime framework for robotics
Documentation
use crate::{
    codelet::{
        Clocks, LifecycleStatus, NodeId, NodeletSetup, ScheduleBuilder, ScheduleId,
        SharedNodeCrumbs, Vise, ViseTrait,
    },
    core::Clock,
    opt_vec::OptVec,
    prelude::SharedAppMonitor,
};
use eyre::eyre;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::{
    collections::HashMap,
    sync::{Arc, RwLock},
    time::Duration,
};

pub struct App {
    clocks: Clocks,
    nodelet_monitor: SharedAppMonitor,
    schedule_monitor: SharedScheduleMonitor,
    nodes: Slab<Arc<NodeInfo>>,
    crumbs: HashMap<String, SharedNodeCrumbs>,
    schedules: Slab<Arc<ScheduleInfo>>,
}

impl App {
    pub fn new(nodelet_monitor: SharedAppMonitor, schedule_monitor: SharedScheduleMonitor) -> Self {
        Self {
            clocks: Clocks::new(),
            nodelet_monitor,
            schedule_monitor,
            nodes: Default::default(),
            crumbs: Default::default(),
            schedules: Default::default(),
        }
    }

    /// Creates an context used to add a new schedule to the app
    pub fn schedule_setup_context<'a>(
        &'a mut self,
        builder: &ScheduleBuilder,
    ) -> AppSetupScheduleContext<'a> {
        AppSetupScheduleContext::new(self, builder)
    }

    pub fn nodelet_monitor(&self) -> &SharedAppMonitor {
        &self.nodelet_monitor
    }

    pub fn schedule_monitor(&self) -> &SharedScheduleMonitor {
        &self.schedule_monitor
    }

    pub fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &Arc<NodeInfo>)> {
        self.nodes.iter().map(|(k, v)| (NodeId(k), v))
    }

    pub fn iter_schedules(&self) -> impl Iterator<Item = (ScheduleId, &Arc<ScheduleInfo>)> {
        self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
    }

    fn has_node_with_name(&self, node_name: &str) -> bool {
        self.nodes
            .iter()
            .find(|(_, info)| info.node_name == node_name)
            .is_some()
    }

    fn get_or_add_schedule_crumbs(&mut self, schedule_name: &str) -> SharedNodeCrumbs {
        match self.crumbs.get(schedule_name) {
            Some(crumbs) => crumbs.clone(),
            None => {
                let crumbs = SharedNodeCrumbs::new();
                self.crumbs
                    .insert(String::from(schedule_name), crumbs.clone());
                crumbs
            }
        }
    }

    fn insert_node(&mut self, info: Arc<NodeInfo>) -> NodeId {
        let idx = self.nodes.insert(info);
        NodeId(idx)
    }

    fn insert_schedule(&mut self, info: Arc<ScheduleInfo>) -> ScheduleId {
        let idx = self.schedules.insert(info);
        ScheduleId(idx)
    }

    pub fn check_for_stalled_schedules(&self) {
        let now = self.clocks.app_mono.now();
        for (_, crumbs) in self.crumbs.iter() {
            if let Some(crumbs) = crumbs.read() {
                let duration = now.saturating_sub(*crumbs.time).as_secs_f64();
                if duration > 1.0 {
                    let name = &self.nodes[crumbs.node_id.0].node_name;
                    log::warn!(
                        "execution stalled: name={}, transition={:?}, duration={}",
                        name,
                        crumbs.transition,
                        duration,
                    );
                }
            }
        }
    }
}

/// Metadata information about a node
#[derive(Default, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
    pub schedule_name: String,
    pub sequence_name: String,
    pub node_name: String,
    pub typename: String,
    pub rx_names: Vec<String>,
    pub tx_names: Vec<String>,
    pub signal_names: Vec<String>,
}

impl NodeInfo {
    /// Finds index of an RX channel by name
    pub fn rx_index_by_name(&self, needle: &str) -> Option<usize> {
        self.rx_names.iter().position(|name| name == needle)
    }

    /// Finds index of an TX channel by name
    pub fn tx_index_by_name(&self, needle: &str) -> Option<usize> {
        self.tx_names.iter().position(|name| name == needle)
    }

    /// Finds index of a signal by name
    pub fn signal_index_by_name(&self, needle: &str) -> Option<usize> {
        self.signal_names.iter().position(|name| name == needle)
    }
}

#[derive(Default, Clone, Serialize, Deserialize)]
pub struct ScheduleInfo {
    pub name: String,
    pub period: Option<Duration>,
}

pub struct AppSetupScheduleContext<'a> {
    app: &'a mut App,
    id: ScheduleId,
    schedule_name: String,
    sequence_name: Option<String>,
    crumbs: SharedNodeCrumbs,
}

impl<'a> AppSetupScheduleContext<'a> {
    pub fn new(app: &'a mut App, builder: &ScheduleBuilder) -> Self {
        let crumbs = app.get_or_add_schedule_crumbs(&builder.name);

        let id = app.insert_schedule(Arc::new(ScheduleInfo {
            name: builder.name.clone(),
            period: builder.period,
        }));

        app.schedule_monitor.insert(id);

        Self {
            app,
            id,
            schedule_name: builder.name.clone(),
            sequence_name: None,
            crumbs,
        }
    }

    pub fn id(&self) -> ScheduleId {
        self.id
    }

    pub fn on_begin_sequence(&mut self, sequence_name: String) {
        self.sequence_name = Some(sequence_name);
    }

    pub fn on_end_sequence(&mut self) {
        self.sequence_name = None;
    }

    pub fn on_node(&mut self, vise: &mut Vise) {
        let node_name = vise.name().to_string();

        if self.app.has_node_with_name(&node_name) {
            log::warn!("duplicate node name: {node_name}");
        }

        let info = Arc::new(NodeInfo {
            schedule_name: self.schedule_name.clone(),
            sequence_name: self
                .sequence_name
                .clone()
                .expect("internal error: on_begin_sequence must be called before on_node"),
            node_name,
            typename: vise.type_name().to_string(),
            rx_names: vise
                .rx_names()
                .into_iter()
                .map(ToString::to_string)
                .collect(),
            tx_names: vise
                .tx_names()
                .into_iter()
                .map(ToString::to_string)
                .collect(),
            signal_names: vise
                .signal_names()
                .into_iter()
                .map(ToString::to_string)
                .collect(),
        });

        let node_id = self.app.insert_node(info.clone());

        vise.setup(NodeletSetup {
            info,
            clocks: self.app.clocks.clone(),
            node_id,
            monitor: self.app.nodelet_monitor.clone(),
            crumbs: self.crumbs.clone(),
        });
    }

    pub fn monitor(&self) -> SharedAppMonitor {
        self.app.nodelet_monitor.clone()
    }
}

#[derive(Clone)]
pub struct SharedScheduleMonitor(Arc<RwLock<ScheduleMonitor>>);

impl SharedScheduleMonitor {
    pub fn new() -> Self {
        Self(Arc::new(RwLock::new(ScheduleMonitor::default())))
    }

    pub fn insert(&self, id: ScheduleId) {
        let mut m = self.0.write().unwrap();
        m.schedules
            .insert(id.into(), ScheduleMonitorEntry::default());
    }

    pub fn edit<F>(&self, id: ScheduleId, f: F) -> eyre::Result<()>
    where
        F: Fn(&mut ScheduleMonitorEntry),
    {
        let mut m = self.0.write().map_err(|_| eyre!("lock poisoned"))?;
        let m = m
            .get_mut(id)
            .ok_or_else(|| eyre!("invalid schedule ID: {id:?}"))?;
        f(m);
        Ok(())
    }

    pub fn to_data(&self) -> eyre::Result<ScheduleMonitor> {
        Ok(self.0.read().map_err(|_| eyre!("lock poisoned"))?.clone())
    }
}

#[derive(Default, Clone)]
pub struct ScheduleMonitor {
    schedules: OptVec<ScheduleMonitorEntry>,
}

impl ScheduleMonitor {
    pub fn iter(&self) -> impl Iterator<Item = (ScheduleId, &ScheduleMonitorEntry)> {
        self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
    }

    pub fn get(&self, id: ScheduleId) -> Option<&ScheduleMonitorEntry> {
        self.schedules.get(id.into())
    }

    pub fn get_mut(&mut self, id: ScheduleId) -> Option<&mut ScheduleMonitorEntry> {
        self.schedules.get_mut(id.into())
    }
}

#[derive(Default, Clone)]
pub struct ScheduleMonitorEntry {
    pub lifecycle_status: LifecycleStatus,
    pub last_period: Option<Duration>,
    pub last_error: Option<String>,
    pub has_panicked: bool,
    pub has_finished: bool,
}