use crate::{
proto::{nodo as nodo_pb, nodo::AppStatistics},
ProtoReportSettings,
};
use eyre::{bail, eyre};
use nodo::{
app::{App, NodeInfo, ScheduleMonitor},
codelet::{CountTotal, LifecycleStatus, ScheduleId, Transition},
monitors::{AppMonitorData, MonitorStatus},
prelude::*,
};
use std::time::Duration;
pub(crate) fn proto_report_from_app(app: &App, settings: &ProtoReportSettings) -> nodo_pb::Report {
let schedule_monitor_data = app.schedule_monitor().to_data().expect("TODO");
let nodelet_monitor_data = app.nodelet_monitor().to_data().expect("TODO");
nodo_pb::Report {
meta: settings
.include_info
.then(|| to_proto_app_meta(app, &nodelet_monitor_data)),
app_info_version: 0, status: Some(to_proto_app_status(
app,
&schedule_monitor_data,
&nodelet_monitor_data,
)),
statistics: Some(to_proto_app_statistics(&nodelet_monitor_data)),
signals: Some(to_proto_app_signals(&nodelet_monitor_data)),
monitors: Some(to_proto_app_monitors(&nodelet_monitor_data)),
}
}
fn to_proto_app_meta<'a>(app: &App, monitor_data: &AppMonitorData) -> nodo_pb::AppMeta {
let nodes = app
.iter_nodes()
.map(|(id, info)| {
nodo_pb::NodeMeta {
id: id.0 as u32, schedule_name: info.schedule_name.clone(),
sequence_name: info.sequence_name.clone(),
node_name: info.node_name.clone(),
typename: info.typename.clone(),
rx_names: info.rx_names.clone(),
tx_names: info.tx_names.clone(),
signal_names: info.signal_names.clone(),
}
})
.collect();
let monitors = monitor_data
.monitors
.iter()
.map(|mon| nodo_pb::MonitorMeta {
info: mon.info.clone(),
node_id: mon.node_id.0 as u32, key: Some(to_proto_gauge_key(&mon.key)),
})
.collect();
let schedules = app
.iter_schedules()
.map(|(id, info)| nodo_pb::ScheduleMeta {
id: id.0 as u32,
name: info.name.clone(),
period: info.period.map(|d| d.as_secs_f32()),
})
.collect();
nodo_pb::AppMeta {
nodes,
monitors,
schedules,
}
}
fn to_proto_app_status(
_app: &App,
schedule_monitor_data: &ScheduleMonitor,
nodelet_monitor_data: &AppMonitorData,
) -> nodo_pb::AppStatus {
let nodes = nodelet_monitor_data
.observables
.iter()
.map(|(_, obsv)| {
let mut proto_status = nodo_pb::NodeStatus::default();
proto_status.lifecycle_status = to_proto_lifecycle_status(obsv.lifecycle_status).into();
match &obsv.status {
Some((label, status)) => {
proto_status.label = label.clone();
proto_status.is_skipped = *status == DefaultStatus::Skipped;
}
None => {
proto_status.is_skipped = true;
}
};
proto_status
})
.collect();
let schedules = schedule_monitor_data
.iter()
.map(|(id, v)| nodo_pb::ScheduleStatus {
id: <ScheduleId as Into<usize>>::into(id) as u32,
lifecycle_status: to_proto_lifecycle_status(v.lifecycle_status).into(),
last_period: v.last_period.map(|d| d.as_secs_f32()),
last_error: v.last_error.clone(),
has_panicked: v.has_panicked,
has_finished: v.has_finished,
})
.collect();
nodo_pb::AppStatus { nodes, schedules }
}
fn to_proto_app_statistics(data: &AppMonitorData) -> nodo_pb::AppStatistics {
let nodes = data
.observables
.iter()
.map(|(_, obsv)| nodo_pb::NodeStatistics {
transitions: obsv
.statistics
.transitions
.iter()
.map(|t| nodo_pb::TransitionStatistics {
duration: Some(to_proto_count_total(&t.duration)),
period: Some(to_proto_count_total(&t.period)),
skipped_count: t.skipped_count,
})
.collect(),
rx_available_messages_count: vec![],
tx_published_message_count: vec![],
tx_last_pubtime: vec![],
})
.collect();
nodo_pb::AppStatistics { nodes }
}
fn to_proto_app_signals(data: &AppMonitorData) -> nodo_pb::AppSignals {
let nodes = data
.observables
.iter()
.map(|(_, obsv)| nodo_pb::NodeSignals {
signals: obsv
.signals
.iter()
.map(|signal| match &signal.cell {
Some(SignalTimeValue { time, value }) => nodo_pb::NodeSignal {
pubtime: Some(to_proto_duration(time)),
value: Some(to_proto_signal_value(value)),
},
None => nodo_pb::NodeSignal::default(),
})
.collect(),
})
.collect();
nodo_pb::AppSignals { nodes }
}
fn to_proto_app_monitors(data: &AppMonitorData) -> nodo_pb::AppMonitors {
let monitors = data
.monitors
.iter()
.map(|mon| nodo_pb::Monitor {
pubtime: mon.pubtime.map(|t| to_proto_duration(&*t)),
value: mon
.value
.as_ref()
.ok()
.and_then(|v| v.as_ref().map(to_proto_gauge_value)),
value_error: match &mon.value {
Ok(_) => String::from(""),
Err(err) => format!("{err:?}"),
},
status: mon
.status
.as_ref()
.ok()
.map(|s| to_proto_monitor_status(*s))
.unwrap_or(nodo_pb::MonitorStatus::Unspecified)
.into(),
status_error: match &mon.status {
Ok(_) => String::from(""),
Err(err) => format!("{err:?}"),
},
})
.collect();
nodo_pb::AppMonitors { monitors }
}
fn to_proto_gauge_key(src: &GaugeKey) -> nodo_pb::GaugeKey {
let (kind, name) = match src {
GaugeKey::SignalValue(name) => (nodo_pb::GaugeKeyKind::SignalValue, name),
GaugeKey::SignalPubtime(name) => (nodo_pb::GaugeKeyKind::SignalPubtime, name),
GaugeKey::RxAvailable(name) => (nodo_pb::GaugeKeyKind::RxAvailable, name),
GaugeKey::TxTotal(name) => (nodo_pb::GaugeKeyKind::TxTotal, name),
GaugeKey::TxPubtime(name) => (nodo_pb::GaugeKeyKind::TxPubtime, name),
};
nodo_pb::GaugeKey {
kind: kind as i32,
name: name.to_string(),
}
}
fn to_proto_gauge_value(src: &GaugeValue) -> nodo_pb::GaugeValue {
let value = match src {
GaugeValue::Bool(v) => nodo_pb::gauge_value::Value::Bool(*v),
GaugeValue::Int64(v) => nodo_pb::gauge_value::Value::Int64(*v),
GaugeValue::Usize(v) => nodo_pb::gauge_value::Value::Usize(*v as u64),
GaugeValue::Float64(v) => nodo_pb::gauge_value::Value::Float64(*v),
GaugeValue::String(v) => nodo_pb::gauge_value::Value::String(v.clone()),
GaugeValue::Pubtime(v) => nodo_pb::gauge_value::Value::Pubtime(to_proto_duration(&v)),
GaugeValue::Acqtime(v) => nodo_pb::gauge_value::Value::Acqtime(to_proto_duration(&v)),
};
nodo_pb::GaugeValue { value: Some(value) }
}
fn to_proto_monitor_status(src: MonitorStatus) -> nodo_pb::MonitorStatus {
match src {
MonitorStatus::Nominal => nodo_pb::MonitorStatus::Nominal,
MonitorStatus::Warning => nodo_pb::MonitorStatus::Warning,
MonitorStatus::Critical => nodo_pb::MonitorStatus::Critical,
}
}
fn to_proto_count_total(src: &CountTotal) -> nodo_pb::StatisticsSamples {
nodo_pb::StatisticsSamples {
count: src.count(),
total: src.total().as_secs_f32(),
min_ms: src.min_ms(),
max_ms: src.max_ms(),
}
}
fn to_proto_duration(src: &Duration) -> nodo_pb::Duration {
nodo_pb::Duration {
secs: src.as_secs() as u32, nanos: src.subsec_nanos(),
}
}
fn to_proto_signal_value(src: &SignalValue) -> nodo_pb::node_signal::Value {
match src {
SignalValue::Bool(v) => nodo_pb::node_signal::Value::Bool(*v),
SignalValue::Int64(v) => nodo_pb::node_signal::Value::Int64(*v),
SignalValue::Usize(v) => nodo_pb::node_signal::Value::Usize(*v as u64),
SignalValue::Float64(v) => nodo_pb::node_signal::Value::Float64(*v),
SignalValue::String(v) => nodo_pb::node_signal::Value::String(v.clone()),
}
}
fn to_proto_lifecycle_status(ls: LifecycleStatus) -> nodo_pb::LifecycleStatus {
match ls {
LifecycleStatus::Inactive => nodo_pb::LifecycleStatus::Inactive,
LifecycleStatus::Starting => nodo_pb::LifecycleStatus::Starting,
LifecycleStatus::Running => nodo_pb::LifecycleStatus::Running,
LifecycleStatus::Pausing => nodo_pb::LifecycleStatus::Pausing,
LifecycleStatus::Paused => nodo_pb::LifecycleStatus::Paused,
LifecycleStatus::Resuming => nodo_pb::LifecycleStatus::Resuming,
LifecycleStatus::Stopping => nodo_pb::LifecycleStatus::Stopping,
LifecycleStatus::Error => nodo_pb::LifecycleStatus::Error,
}
}
pub fn proto_report_pretty_print(report: nodo_pb::Report) -> eyre::Result<()> {
struct Entry {
info: NodeInfo,
stats: AppStatistics,
}
let mut entries = match (report.meta, report.statistics) {
(Some(info), Some(statistics)) => {
if info.nodes.len() != statistics.nodes.len() {
bail!("invalid report: nodes/statistics len mismatch");
}
info.nodes
.into_iter()
.zip(statistics.nodes.into_iter())
.collect::<Vec<_>>()
}
_ => bail!("report missing info or statistics"),
};
entries.sort_by_key(|(_, stats)| {
(stats.transitions[Transition::Step.index()]
.duration
.unwrap()
.total
* 1000000.) as i64
});
println!("");
println!("+----+--------------------------+----------------------------------+--------+--------+----------------------+-------+----------------------+--------+---------+");
println!("| ID | NAME | TYPE | STEP Duration Period | START |");
println!("| | | | Skipped| Count | (min-avg-max) [ms] | Total | (min-avg-max) [ms] | Count | D [ms] |");
println!("+----+--------------------------+----------------------------------+--------+--------+----------------------+-------+----------------------+--------+---------+");
for (
nodo_pb::NodeMeta {
id,
node_name,
typename,
..
},
nodo_pb::NodeStatistics { transitions, .. },
) in entries.into_iter().rev()
{
let transitions_step = transitions[Transition::Step.index()];
let transitions_step_duration = transitions_step
.duration
.ok_or_else(|| eyre!("invalid report"))?;
let transitions_step_period = transitions_step
.period
.ok_or_else(|| eyre!("invalid report"))?;
let transitions_start = transitions[Transition::Start.index()];
let transitions_start_duration = transitions_start
.duration
.ok_or_else(|| eyre!("invalid report"))?;
println!(
"|{:3} | {:024} | {:032} | {:6} | {:6} | {} {} {} |{} | {} {} {} | {:2} /{:2} | {} |",
format!("{id:>3}"),
cut_middle(&node_name, 24),
cut_middle(&typename, 32),
transitions_step.skipped_count,
transitions_step_duration.count,
transitions_step_duration
.min_ms
.map(|dt| format!("{:>6.2}", dt))
.unwrap_or("------".to_string()),
average_ms(&transitions_step_duration)
.map(|dt| format!("{:>6.2}", dt))
.unwrap_or("------".to_string()),
transitions_step_duration
.max_ms
.map(|dt| format!("{:>6.2}", dt))
.unwrap_or("------".to_string()),
format!("{:>6.2}", transitions_step_duration.total),
transitions_step_period
.min_ms
.map(|dt| format!("{:>6.2}", dt))
.unwrap_or("------".to_string()),
average_ms(&transitions_step_period)
.map(|dt| format!("{:>6.2}", dt))
.unwrap_or("------".to_string()),
transitions_step_period
.max_ms
.map(|dt| format!("{:>6.2}", dt))
.unwrap_or("------".to_string()),
transitions_start.skipped_count,
transitions_start_duration.count,
average_ms(&transitions_start_duration)
.map(|dt| format!("{:>7.2}", dt))
.unwrap_or("-------".to_string()),
);
}
println!("+----+--------------------------+----------------------------------+--------+--------+----------------------+-------+----------------------+--------+---------+");
Ok(())
}
fn cut_middle(text: &String, len: usize) -> String {
if text.len() <= len || len <= 6 {
text.to_string()
} else {
text[0..2].to_string() + ".." + &text[(text.len() - (len - 4))..]
}
}
fn average_ms(sample: &nodo_pb::StatisticsSamples) -> Option<f32> {
(sample.count > 0).then(|| 1000.0 * sample.total / sample.count as f32)
}