use crate::{
codelet::{
Clocks, LifecycleStatus, NodeId, NodeletSetup, ScheduleBuilder, ScheduleId,
SharedNodeCrumbs, Vise, ViseTrait,
},
core::Clock,
opt_vec::OptVec,
prelude::SharedAppMonitor,
};
use eyre::eyre;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
time::Duration,
};
pub struct App {
clocks: Clocks,
nodelet_monitor: SharedAppMonitor,
schedule_monitor: SharedScheduleMonitor,
nodes: Slab<Arc<NodeInfo>>,
crumbs: HashMap<String, SharedNodeCrumbs>,
schedules: Slab<Arc<ScheduleInfo>>,
}
impl App {
pub fn new(nodelet_monitor: SharedAppMonitor, schedule_monitor: SharedScheduleMonitor) -> Self {
Self {
clocks: Clocks::new(),
nodelet_monitor,
schedule_monitor,
nodes: Default::default(),
crumbs: Default::default(),
schedules: Default::default(),
}
}
pub fn schedule_setup_context<'a>(
&'a mut self,
builder: &ScheduleBuilder,
) -> AppSetupScheduleContext<'a> {
AppSetupScheduleContext::new(self, builder)
}
pub fn nodelet_monitor(&self) -> &SharedAppMonitor {
&self.nodelet_monitor
}
pub fn schedule_monitor(&self) -> &SharedScheduleMonitor {
&self.schedule_monitor
}
pub fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &Arc<NodeInfo>)> {
self.nodes.iter().map(|(k, v)| (NodeId(k), v))
}
pub fn iter_schedules(&self) -> impl Iterator<Item = (ScheduleId, &Arc<ScheduleInfo>)> {
self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
}
fn has_node_with_name(&self, node_name: &str) -> bool {
self.nodes
.iter()
.find(|(_, info)| info.node_name == node_name)
.is_some()
}
fn get_or_add_schedule_crumbs(&mut self, schedule_name: &str) -> SharedNodeCrumbs {
match self.crumbs.get(schedule_name) {
Some(crumbs) => crumbs.clone(),
None => {
let crumbs = SharedNodeCrumbs::new();
self.crumbs
.insert(String::from(schedule_name), crumbs.clone());
crumbs
}
}
}
fn insert_node(&mut self, info: Arc<NodeInfo>) -> NodeId {
let idx = self.nodes.insert(info);
NodeId(idx)
}
fn insert_schedule(&mut self, info: Arc<ScheduleInfo>) -> ScheduleId {
let idx = self.schedules.insert(info);
ScheduleId(idx)
}
pub fn check_for_stalled_schedules(&self) {
let now = self.clocks.app_mono.now();
for (_, crumbs) in self.crumbs.iter() {
if let Some(crumbs) = crumbs.read() {
let duration = now.saturating_sub(*crumbs.time).as_secs_f64();
if duration > 1.0 {
let name = &self.nodes[crumbs.node_id.0].node_name;
log::warn!(
"execution stalled: name={}, transition={:?}, duration={}",
name,
crumbs.transition,
duration,
);
}
}
}
}
}
#[derive(Default, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
pub schedule_name: String,
pub sequence_name: String,
pub node_name: String,
pub typename: String,
pub rx_names: Vec<String>,
pub tx_names: Vec<String>,
pub signal_names: Vec<String>,
}
impl NodeInfo {
pub fn rx_index_by_name(&self, needle: &str) -> Option<usize> {
self.rx_names.iter().position(|name| name == needle)
}
pub fn tx_index_by_name(&self, needle: &str) -> Option<usize> {
self.tx_names.iter().position(|name| name == needle)
}
pub fn signal_index_by_name(&self, needle: &str) -> Option<usize> {
self.signal_names.iter().position(|name| name == needle)
}
}
#[derive(Default, Clone, Serialize, Deserialize)]
pub struct ScheduleInfo {
pub name: String,
pub period: Option<Duration>,
}
pub struct AppSetupScheduleContext<'a> {
app: &'a mut App,
id: ScheduleId,
schedule_name: String,
sequence_name: Option<String>,
crumbs: SharedNodeCrumbs,
}
impl<'a> AppSetupScheduleContext<'a> {
pub fn new(app: &'a mut App, builder: &ScheduleBuilder) -> Self {
let crumbs = app.get_or_add_schedule_crumbs(&builder.name);
let id = app.insert_schedule(Arc::new(ScheduleInfo {
name: builder.name.clone(),
period: builder.period,
}));
app.schedule_monitor.insert(id);
Self {
app,
id,
schedule_name: builder.name.clone(),
sequence_name: None,
crumbs,
}
}
pub fn id(&self) -> ScheduleId {
self.id
}
pub fn on_begin_sequence(&mut self, sequence_name: String) {
self.sequence_name = Some(sequence_name);
}
pub fn on_end_sequence(&mut self) {
self.sequence_name = None;
}
pub fn on_node(&mut self, vise: &mut Vise) {
let node_name = vise.name().to_string();
if self.app.has_node_with_name(&node_name) {
log::warn!("duplicate node name: {node_name}");
}
let info = Arc::new(NodeInfo {
schedule_name: self.schedule_name.clone(),
sequence_name: self
.sequence_name
.clone()
.expect("internal error: on_begin_sequence must be called before on_node"),
node_name,
typename: vise.type_name().to_string(),
rx_names: vise
.rx_names()
.into_iter()
.map(ToString::to_string)
.collect(),
tx_names: vise
.tx_names()
.into_iter()
.map(ToString::to_string)
.collect(),
signal_names: vise
.signal_names()
.into_iter()
.map(ToString::to_string)
.collect(),
});
let node_id = self.app.insert_node(info.clone());
vise.setup(NodeletSetup {
info,
clocks: self.app.clocks.clone(),
node_id,
monitor: self.app.nodelet_monitor.clone(),
crumbs: self.crumbs.clone(),
});
}
pub fn monitor(&self) -> SharedAppMonitor {
self.app.nodelet_monitor.clone()
}
}
#[derive(Clone)]
pub struct SharedScheduleMonitor(Arc<RwLock<ScheduleMonitor>>);
impl SharedScheduleMonitor {
pub fn new() -> Self {
Self(Arc::new(RwLock::new(ScheduleMonitor::default())))
}
pub fn insert(&self, id: ScheduleId) {
let mut m = self.0.write().unwrap();
m.schedules
.insert(id.into(), ScheduleMonitorEntry::default());
}
pub fn edit<F>(&self, id: ScheduleId, f: F) -> eyre::Result<()>
where
F: Fn(&mut ScheduleMonitorEntry),
{
let mut m = self.0.write().map_err(|_| eyre!("lock poisoned"))?;
let m = m
.get_mut(id)
.ok_or_else(|| eyre!("invalid schedule ID: {id:?}"))?;
f(m);
Ok(())
}
pub fn to_data(&self) -> eyre::Result<ScheduleMonitor> {
Ok(self.0.read().map_err(|_| eyre!("lock poisoned"))?.clone())
}
}
#[derive(Default, Clone)]
pub struct ScheduleMonitor {
schedules: OptVec<ScheduleMonitorEntry>,
}
impl ScheduleMonitor {
pub fn iter(&self) -> impl Iterator<Item = (ScheduleId, &ScheduleMonitorEntry)> {
self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
}
pub fn get(&self, id: ScheduleId) -> Option<&ScheduleMonitorEntry> {
self.schedules.get(id.into())
}
pub fn get_mut(&mut self, id: ScheduleId) -> Option<&mut ScheduleMonitorEntry> {
self.schedules.get_mut(id.into())
}
}
#[derive(Default, Clone)]
pub struct ScheduleMonitorEntry {
pub lifecycle_status: LifecycleStatus,
pub last_period: Option<Duration>,
pub last_error: Option<String>,
pub has_panicked: bool,
pub has_finished: bool,
}