nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
// Copyright 2025 David Weikersdorfer

use crate::{
    proto::{nodo as nodo_pb, nodo::AppStatistics},
    ProtoReportSettings,
};
use eyre::{bail, eyre};
use nodo::{
    app::{App, NodeInfo, ScheduleMonitor},
    codelet::{CountTotal, LifecycleStatus, ScheduleId, Transition},
    monitors::{AppMonitorData, MonitorStatus},
    prelude::*,
};
use std::time::Duration;

pub(crate) fn proto_report_from_app(app: &App, settings: &ProtoReportSettings) -> nodo_pb::Report {
    let schedule_monitor_data = app.schedule_monitor().to_data().expect("TODO");
    let nodelet_monitor_data = app.nodelet_monitor().to_data().expect("TODO");
    nodo_pb::Report {
        meta: settings
            .include_info
            .then(|| to_proto_app_meta(app, &nodelet_monitor_data)),
        app_info_version: 0, // FIXME
        status: Some(to_proto_app_status(
            app,
            &schedule_monitor_data,
            &nodelet_monitor_data,
        )),
        statistics: Some(to_proto_app_statistics(&nodelet_monitor_data)),
        signals: Some(to_proto_app_signals(&nodelet_monitor_data)),
        monitors: Some(to_proto_app_monitors(&nodelet_monitor_data)),
    }
}

fn to_proto_app_meta<'a>(app: &App, monitor_data: &AppMonitorData) -> nodo_pb::AppMeta {
    let nodes = app
        .iter_nodes()
        .map(|(id, info)| {
            nodo_pb::NodeMeta {
                id: id.0 as u32, // FIXME range check
                schedule_name: info.schedule_name.clone(),
                sequence_name: info.sequence_name.clone(),
                node_name: info.node_name.clone(),
                typename: info.typename.clone(),
                rx_names: info.rx_names.clone(),
                tx_names: info.tx_names.clone(),
                signal_names: info.signal_names.clone(),
            }
        })
        .collect();

    let monitors = monitor_data
        .monitors
        .iter()
        .map(|mon| nodo_pb::MonitorMeta {
            info: mon.info.clone(),
            node_id: mon.node_id.0 as u32, // FIXME range check
            key: Some(to_proto_gauge_key(&mon.key)),
        })
        .collect();

    let schedules = app
        .iter_schedules()
        .map(|(id, info)| nodo_pb::ScheduleMeta {
            id: id.0 as u32,
            name: info.name.clone(),
            period: info.period.map(|d| d.as_secs_f32()),
        })
        .collect();

    nodo_pb::AppMeta {
        nodes,
        monitors,
        schedules,
    }
}

fn to_proto_app_status(
    _app: &App,
    schedule_monitor_data: &ScheduleMonitor,
    nodelet_monitor_data: &AppMonitorData,
) -> nodo_pb::AppStatus {
    let nodes = nodelet_monitor_data
        .observables
        .iter()
        .map(|(_, obsv)| {
            let mut proto_status = nodo_pb::NodeStatus::default();

            proto_status.lifecycle_status = to_proto_lifecycle_status(obsv.lifecycle_status).into();

            match &obsv.status {
                Some((label, status)) => {
                    proto_status.label = label.clone();
                    proto_status.is_skipped = *status == DefaultStatus::Skipped;
                }
                None => {
                    proto_status.is_skipped = true;
                }
            };

            proto_status
        })
        .collect();

    let schedules = schedule_monitor_data
        .iter()
        .map(|(id, v)| nodo_pb::ScheduleStatus {
            id: <ScheduleId as Into<usize>>::into(id) as u32,
            lifecycle_status: to_proto_lifecycle_status(v.lifecycle_status).into(),
            last_period: v.last_period.map(|d| d.as_secs_f32()),
            last_error: v.last_error.clone(),
            has_panicked: v.has_panicked,
            has_finished: v.has_finished,
        })
        .collect();

    nodo_pb::AppStatus { nodes, schedules }
}

fn to_proto_app_statistics(data: &AppMonitorData) -> nodo_pb::AppStatistics {
    let nodes = data
        .observables
        .iter()
        .map(|(_, obsv)| nodo_pb::NodeStatistics {
            transitions: obsv
                .statistics
                .transitions
                .iter()
                .map(|t| nodo_pb::TransitionStatistics {
                    duration: Some(to_proto_count_total(&t.duration)),
                    period: Some(to_proto_count_total(&t.period)),
                    skipped_count: t.skipped_count,
                })
                .collect(),
            rx_available_messages_count: vec![],
            tx_published_message_count: vec![],
            tx_last_pubtime: vec![],
        })
        .collect();

    nodo_pb::AppStatistics { nodes }
}

fn to_proto_app_signals(data: &AppMonitorData) -> nodo_pb::AppSignals {
    let nodes = data
        .observables
        .iter()
        .map(|(_, obsv)| nodo_pb::NodeSignals {
            signals: obsv
                .signals
                .iter()
                .map(|signal| match &signal.cell {
                    Some(SignalTimeValue { time, value }) => nodo_pb::NodeSignal {
                        pubtime: Some(to_proto_duration(time)),
                        value: Some(to_proto_signal_value(value)),
                    },
                    None => nodo_pb::NodeSignal::default(),
                })
                .collect(),
        })
        .collect();

    nodo_pb::AppSignals { nodes }
}

fn to_proto_app_monitors(data: &AppMonitorData) -> nodo_pb::AppMonitors {
    let monitors = data
        .monitors
        .iter()
        .map(|mon| nodo_pb::Monitor {
            pubtime: mon.pubtime.map(|t| to_proto_duration(&*t)),
            value: mon
                .value
                .as_ref()
                .ok()
                .and_then(|v| v.as_ref().map(to_proto_gauge_value)),
            value_error: match &mon.value {
                Ok(_) => String::from(""),
                Err(err) => format!("{err:?}"),
            },
            status: mon
                .status
                .as_ref()
                .ok()
                .map(|s| to_proto_monitor_status(*s))
                .unwrap_or(nodo_pb::MonitorStatus::Unspecified)
                .into(),
            status_error: match &mon.status {
                Ok(_) => String::from(""),
                Err(err) => format!("{err:?}"),
            },
        })
        .collect();

    nodo_pb::AppMonitors { monitors }
}

fn to_proto_gauge_key(src: &GaugeKey) -> nodo_pb::GaugeKey {
    let (kind, name) = match src {
        GaugeKey::SignalValue(name) => (nodo_pb::GaugeKeyKind::SignalValue, name),
        GaugeKey::SignalPubtime(name) => (nodo_pb::GaugeKeyKind::SignalPubtime, name),
        GaugeKey::RxAvailable(name) => (nodo_pb::GaugeKeyKind::RxAvailable, name),
        GaugeKey::TxTotal(name) => (nodo_pb::GaugeKeyKind::TxTotal, name),
        GaugeKey::TxPubtime(name) => (nodo_pb::GaugeKeyKind::TxPubtime, name),
    };

    nodo_pb::GaugeKey {
        kind: kind as i32,
        name: name.to_string(),
    }
}

fn to_proto_gauge_value(src: &GaugeValue) -> nodo_pb::GaugeValue {
    let value = match src {
        GaugeValue::Bool(v) => nodo_pb::gauge_value::Value::Bool(*v),
        GaugeValue::Int64(v) => nodo_pb::gauge_value::Value::Int64(*v),
        GaugeValue::Usize(v) => nodo_pb::gauge_value::Value::Usize(*v as u64),
        GaugeValue::Float64(v) => nodo_pb::gauge_value::Value::Float64(*v),
        GaugeValue::String(v) => nodo_pb::gauge_value::Value::String(v.clone()),
        GaugeValue::Pubtime(v) => nodo_pb::gauge_value::Value::Pubtime(to_proto_duration(&v)),
        GaugeValue::Acqtime(v) => nodo_pb::gauge_value::Value::Acqtime(to_proto_duration(&v)),
    };

    nodo_pb::GaugeValue { value: Some(value) }
}

fn to_proto_monitor_status(src: MonitorStatus) -> nodo_pb::MonitorStatus {
    match src {
        MonitorStatus::Nominal => nodo_pb::MonitorStatus::Nominal,
        MonitorStatus::Warning => nodo_pb::MonitorStatus::Warning,
        MonitorStatus::Critical => nodo_pb::MonitorStatus::Critical,
    }
}

fn to_proto_count_total(src: &CountTotal) -> nodo_pb::StatisticsSamples {
    nodo_pb::StatisticsSamples {
        count: src.count(),
        total: src.total().as_secs_f32(),
        min_ms: src.min_ms(),
        max_ms: src.max_ms(),
    }
}

fn to_proto_duration(src: &Duration) -> nodo_pb::Duration {
    nodo_pb::Duration {
        secs: src.as_secs() as u32, // FIXME range check
        nanos: src.subsec_nanos(),
    }
}

fn to_proto_signal_value(src: &SignalValue) -> nodo_pb::node_signal::Value {
    match src {
        SignalValue::Bool(v) => nodo_pb::node_signal::Value::Bool(*v),
        SignalValue::Int64(v) => nodo_pb::node_signal::Value::Int64(*v),
        SignalValue::Usize(v) => nodo_pb::node_signal::Value::Usize(*v as u64),
        SignalValue::Float64(v) => nodo_pb::node_signal::Value::Float64(*v),
        SignalValue::String(v) => nodo_pb::node_signal::Value::String(v.clone()),
    }
}

fn to_proto_lifecycle_status(ls: LifecycleStatus) -> nodo_pb::LifecycleStatus {
    match ls {
        LifecycleStatus::Inactive => nodo_pb::LifecycleStatus::Inactive,
        LifecycleStatus::Starting => nodo_pb::LifecycleStatus::Starting,
        LifecycleStatus::Running => nodo_pb::LifecycleStatus::Running,
        LifecycleStatus::Pausing => nodo_pb::LifecycleStatus::Pausing,
        LifecycleStatus::Paused => nodo_pb::LifecycleStatus::Paused,
        LifecycleStatus::Resuming => nodo_pb::LifecycleStatus::Resuming,
        LifecycleStatus::Stopping => nodo_pb::LifecycleStatus::Stopping,
        LifecycleStatus::Error => nodo_pb::LifecycleStatus::Error,
    }
}

pub fn proto_report_pretty_print(report: nodo_pb::Report) -> eyre::Result<()> {
    struct Entry {
        info: NodeInfo,
        stats: AppStatistics,
    }

    let mut entries = match (report.meta, report.statistics) {
        (Some(info), Some(statistics)) => {
            if info.nodes.len() != statistics.nodes.len() {
                bail!("invalid report: nodes/statistics len mismatch");
            }
            info.nodes
                .into_iter()
                .zip(statistics.nodes.into_iter())
                .collect::<Vec<_>>()
        }
        _ => bail!("report missing info or statistics"),
    };

    entries.sort_by_key(|(_, stats)| {
        (stats.transitions[Transition::Step.index()]
            .duration
            .unwrap()
            .total
            * 1000000.) as i64
    });

    println!("");
    println!("+----+--------------------------+----------------------------------+--------+--------+----------------------+-------+----------------------+--------+---------+");
    println!("| ID | NAME                     | TYPE                             | STEP              Duration                       Period               | START            |");
    println!("|    |                          |                                  | Skipped| Count  | (min-avg-max) [ms]   | Total | (min-avg-max) [ms]   | Count  |  D [ms] |");
    println!("+----+--------------------------+----------------------------------+--------+--------+----------------------+-------+----------------------+--------+---------+");
    for (
        nodo_pb::NodeMeta {
            id,
            node_name,
            typename,
            ..
        },
        nodo_pb::NodeStatistics { transitions, .. },
    ) in entries.into_iter().rev()
    {
        let transitions_step = transitions[Transition::Step.index()];
        let transitions_step_duration = transitions_step
            .duration
            .ok_or_else(|| eyre!("invalid report"))?;
        let transitions_step_period = transitions_step
            .period
            .ok_or_else(|| eyre!("invalid report"))?;

        let transitions_start = transitions[Transition::Start.index()];
        let transitions_start_duration = transitions_start
            .duration
            .ok_or_else(|| eyre!("invalid report"))?;

        println!(
            "|{:3} | {:024} | {:032} | {:6} | {:6} | {} {} {} |{} | {} {} {} | {:2} /{:2} | {} |",
            format!("{id:>3}"),
            cut_middle(&node_name, 24),
            cut_middle(&typename, 32),
            transitions_step.skipped_count,
            transitions_step_duration.count,
            transitions_step_duration
                .min_ms
                .map(|dt| format!("{:>6.2}", dt))
                .unwrap_or("------".to_string()),
            average_ms(&transitions_step_duration)
                .map(|dt| format!("{:>6.2}", dt))
                .unwrap_or("------".to_string()),
            transitions_step_duration
                .max_ms
                .map(|dt| format!("{:>6.2}", dt))
                .unwrap_or("------".to_string()),
            format!("{:>6.2}", transitions_step_duration.total),
            transitions_step_period
                .min_ms
                .map(|dt| format!("{:>6.2}", dt))
                .unwrap_or("------".to_string()),
            average_ms(&transitions_step_period)
                .map(|dt| format!("{:>6.2}", dt))
                .unwrap_or("------".to_string()),
            transitions_step_period
                .max_ms
                .map(|dt| format!("{:>6.2}", dt))
                .unwrap_or("------".to_string()),
            transitions_start.skipped_count,
            transitions_start_duration.count,
            average_ms(&transitions_start_duration)
                .map(|dt| format!("{:>7.2}", dt))
                .unwrap_or("-------".to_string()),
        );
    }
    println!("+----+--------------------------+----------------------------------+--------+--------+----------------------+-------+----------------------+--------+---------+");

    Ok(())
}

fn cut_middle(text: &String, len: usize) -> String {
    if text.len() <= len || len <= 6 {
        text.to_string()
    } else {
        text[0..2].to_string() + ".." + &text[(text.len() - (len - 4))..]
    }
}

fn average_ms(sample: &nodo_pb::StatisticsSamples) -> Option<f32> {
    (sample.count > 0).then(|| 1000.0 * sample.total / sample.count as f32)
}