nodo/
app.rs

1use crate::{
2    codelet::{
3        Clocks, LifecycleStatus, NodeId, NodeletSetup, ScheduleBuilder, ScheduleId,
4        SharedNodeCrumbs, Vise, ViseTrait,
5    },
6    core::Clock,
7    opt_vec::OptVec,
8    prelude::SharedAppMonitor,
9};
10use eyre::eyre;
11use serde::{Deserialize, Serialize};
12use slab::Slab;
13use std::{
14    collections::HashMap,
15    sync::{Arc, RwLock},
16    time::Duration,
17};
18
19pub struct App {
20    clocks: Clocks,
21    nodelet_monitor: SharedAppMonitor,
22    schedule_monitor: SharedScheduleMonitor,
23    nodes: Slab<Arc<NodeInfo>>,
24    crumbs: HashMap<String, SharedNodeCrumbs>,
25    schedules: Slab<Arc<ScheduleInfo>>,
26}
27
28impl App {
29    pub fn new(nodelet_monitor: SharedAppMonitor, schedule_monitor: SharedScheduleMonitor) -> Self {
30        Self {
31            clocks: Clocks::new(),
32            nodelet_monitor,
33            schedule_monitor,
34            nodes: Default::default(),
35            crumbs: Default::default(),
36            schedules: Default::default(),
37        }
38    }
39
40    /// Creates an context used to add a new schedule to the app
41    pub fn schedule_setup_context<'a>(
42        &'a mut self,
43        builder: &ScheduleBuilder,
44    ) -> AppSetupScheduleContext<'a> {
45        AppSetupScheduleContext::new(self, builder)
46    }
47
48    pub fn nodelet_monitor(&self) -> &SharedAppMonitor {
49        &self.nodelet_monitor
50    }
51
52    pub fn schedule_monitor(&self) -> &SharedScheduleMonitor {
53        &self.schedule_monitor
54    }
55
56    pub fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &Arc<NodeInfo>)> {
57        self.nodes.iter().map(|(k, v)| (NodeId(k), v))
58    }
59
60    pub fn iter_schedules(&self) -> impl Iterator<Item = (ScheduleId, &Arc<ScheduleInfo>)> {
61        self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
62    }
63
64    fn has_node_with_name(&self, node_name: &str) -> bool {
65        self.nodes
66            .iter()
67            .find(|(_, info)| info.node_name == node_name)
68            .is_some()
69    }
70
71    fn get_or_add_schedule_crumbs(&mut self, schedule_name: &str) -> SharedNodeCrumbs {
72        match self.crumbs.get(schedule_name) {
73            Some(crumbs) => crumbs.clone(),
74            None => {
75                let crumbs = SharedNodeCrumbs::new();
76                self.crumbs
77                    .insert(String::from(schedule_name), crumbs.clone());
78                crumbs
79            }
80        }
81    }
82
83    fn insert_node(&mut self, info: Arc<NodeInfo>) -> NodeId {
84        let idx = self.nodes.insert(info);
85        NodeId(idx)
86    }
87
88    fn insert_schedule(&mut self, info: Arc<ScheduleInfo>) -> ScheduleId {
89        let idx = self.schedules.insert(info);
90        ScheduleId(idx)
91    }
92
93    pub fn check_for_stalled_schedules(&self) {
94        let now = self.clocks.app_mono.now();
95        for (_, crumbs) in self.crumbs.iter() {
96            if let Some(crumbs) = crumbs.read() {
97                let duration = now.saturating_sub(*crumbs.time).as_secs_f64();
98                if duration > 1.0 {
99                    let name = &self.nodes[crumbs.node_id.0].node_name;
100                    log::warn!(
101                        "execution stalled: name={}, transition={:?}, duration={}",
102                        name,
103                        crumbs.transition,
104                        duration,
105                    );
106                }
107            }
108        }
109    }
110}
111
112/// Metadata information about a node
113#[derive(Default, Clone, Serialize, Deserialize)]
114pub struct NodeInfo {
115    pub schedule_name: String,
116    pub sequence_name: String,
117    pub node_name: String,
118    pub typename: String,
119    pub rx_names: Vec<String>,
120    pub tx_names: Vec<String>,
121    pub signal_names: Vec<String>,
122}
123
124impl NodeInfo {
125    /// Finds index of an RX channel by name
126    pub fn rx_index_by_name(&self, needle: &str) -> Option<usize> {
127        self.rx_names.iter().position(|name| name == needle)
128    }
129
130    /// Finds index of an TX channel by name
131    pub fn tx_index_by_name(&self, needle: &str) -> Option<usize> {
132        self.tx_names.iter().position(|name| name == needle)
133    }
134
135    /// Finds index of a signal by name
136    pub fn signal_index_by_name(&self, needle: &str) -> Option<usize> {
137        self.signal_names.iter().position(|name| name == needle)
138    }
139}
140
141#[derive(Default, Clone, Serialize, Deserialize)]
142pub struct ScheduleInfo {
143    pub name: String,
144    pub period: Option<Duration>,
145}
146
147pub struct AppSetupScheduleContext<'a> {
148    app: &'a mut App,
149    id: ScheduleId,
150    schedule_name: String,
151    sequence_name: Option<String>,
152    crumbs: SharedNodeCrumbs,
153}
154
155impl<'a> AppSetupScheduleContext<'a> {
156    pub fn new(app: &'a mut App, builder: &ScheduleBuilder) -> Self {
157        let crumbs = app.get_or_add_schedule_crumbs(&builder.name);
158
159        let id = app.insert_schedule(Arc::new(ScheduleInfo {
160            name: builder.name.clone(),
161            period: builder.period,
162        }));
163
164        app.schedule_monitor.insert(id);
165
166        Self {
167            app,
168            id,
169            schedule_name: builder.name.clone(),
170            sequence_name: None,
171            crumbs,
172        }
173    }
174
175    pub fn id(&self) -> ScheduleId {
176        self.id
177    }
178
179    pub fn on_begin_sequence(&mut self, sequence_name: String) {
180        self.sequence_name = Some(sequence_name);
181    }
182
183    pub fn on_end_sequence(&mut self) {
184        self.sequence_name = None;
185    }
186
187    pub fn on_node(&mut self, vise: &mut Vise) {
188        let node_name = vise.name().to_string();
189
190        if self.app.has_node_with_name(&node_name) {
191            log::warn!("duplicate node name: {node_name}");
192        }
193
194        let info = Arc::new(NodeInfo {
195            schedule_name: self.schedule_name.clone(),
196            sequence_name: self
197                .sequence_name
198                .clone()
199                .expect("internal error: on_begin_sequence must be called before on_node"),
200            node_name,
201            typename: vise.type_name().to_string(),
202            rx_names: vise
203                .rx_names()
204                .into_iter()
205                .map(ToString::to_string)
206                .collect(),
207            tx_names: vise
208                .tx_names()
209                .into_iter()
210                .map(ToString::to_string)
211                .collect(),
212            signal_names: vise
213                .signal_names()
214                .into_iter()
215                .map(ToString::to_string)
216                .collect(),
217        });
218
219        let node_id = self.app.insert_node(info.clone());
220
221        vise.setup(NodeletSetup {
222            info,
223            clocks: self.app.clocks.clone(),
224            node_id,
225            monitor: self.app.nodelet_monitor.clone(),
226            crumbs: self.crumbs.clone(),
227        });
228    }
229
230    pub fn monitor(&self) -> SharedAppMonitor {
231        self.app.nodelet_monitor.clone()
232    }
233}
234
235#[derive(Clone)]
236pub struct SharedScheduleMonitor(Arc<RwLock<ScheduleMonitor>>);
237
238impl SharedScheduleMonitor {
239    pub fn new() -> Self {
240        Self(Arc::new(RwLock::new(ScheduleMonitor::default())))
241    }
242
243    pub fn insert(&self, id: ScheduleId) {
244        let mut m = self.0.write().unwrap();
245        m.schedules
246            .insert(id.into(), ScheduleMonitorEntry::default());
247    }
248
249    pub fn edit<F>(&self, id: ScheduleId, f: F) -> eyre::Result<()>
250    where
251        F: Fn(&mut ScheduleMonitorEntry),
252    {
253        let mut m = self.0.write().map_err(|_| eyre!("lock poisoned"))?;
254        let m = m
255            .get_mut(id)
256            .ok_or_else(|| eyre!("invalid schedule ID: {id:?}"))?;
257        f(m);
258        Ok(())
259    }
260
261    pub fn to_data(&self) -> eyre::Result<ScheduleMonitor> {
262        Ok(self.0.read().map_err(|_| eyre!("lock poisoned"))?.clone())
263    }
264}
265
266#[derive(Default, Clone)]
267pub struct ScheduleMonitor {
268    schedules: OptVec<ScheduleMonitorEntry>,
269}
270
271impl ScheduleMonitor {
272    pub fn iter(&self) -> impl Iterator<Item = (ScheduleId, &ScheduleMonitorEntry)> {
273        self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
274    }
275
276    pub fn get(&self, id: ScheduleId) -> Option<&ScheduleMonitorEntry> {
277        self.schedules.get(id.into())
278    }
279
280    pub fn get_mut(&mut self, id: ScheduleId) -> Option<&mut ScheduleMonitorEntry> {
281        self.schedules.get_mut(id.into())
282    }
283}
284
285#[derive(Default, Clone)]
286pub struct ScheduleMonitorEntry {
287    pub lifecycle_status: LifecycleStatus,
288    pub last_period: Option<Duration>,
289    pub last_error: Option<String>,
290    pub has_panicked: bool,
291    pub has_finished: bool,
292}