1use crate::{
2 codelet::{
3 Clocks, LifecycleStatus, NodeId, NodeletSetup, ScheduleBuilder, ScheduleId,
4 SharedNodeCrumbs, Vise, ViseTrait,
5 },
6 core::Clock,
7 opt_vec::OptVec,
8 prelude::SharedAppMonitor,
9};
10use eyre::eyre;
11use serde::{Deserialize, Serialize};
12use slab::Slab;
13use std::{
14 collections::HashMap,
15 sync::{Arc, RwLock},
16 time::Duration,
17};
18
19pub struct App {
20 clocks: Clocks,
21 nodelet_monitor: SharedAppMonitor,
22 schedule_monitor: SharedScheduleMonitor,
23 nodes: Slab<Arc<NodeInfo>>,
24 crumbs: HashMap<String, SharedNodeCrumbs>,
25 schedules: Slab<Arc<ScheduleInfo>>,
26}
27
28impl App {
29 pub fn new(nodelet_monitor: SharedAppMonitor, schedule_monitor: SharedScheduleMonitor) -> Self {
30 Self {
31 clocks: Clocks::new(),
32 nodelet_monitor,
33 schedule_monitor,
34 nodes: Default::default(),
35 crumbs: Default::default(),
36 schedules: Default::default(),
37 }
38 }
39
40 pub fn schedule_setup_context<'a>(
42 &'a mut self,
43 builder: &ScheduleBuilder,
44 ) -> AppSetupScheduleContext<'a> {
45 AppSetupScheduleContext::new(self, builder)
46 }
47
48 pub fn nodelet_monitor(&self) -> &SharedAppMonitor {
49 &self.nodelet_monitor
50 }
51
52 pub fn schedule_monitor(&self) -> &SharedScheduleMonitor {
53 &self.schedule_monitor
54 }
55
56 pub fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &Arc<NodeInfo>)> {
57 self.nodes.iter().map(|(k, v)| (NodeId(k), v))
58 }
59
60 pub fn iter_schedules(&self) -> impl Iterator<Item = (ScheduleId, &Arc<ScheduleInfo>)> {
61 self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
62 }
63
64 fn has_node_with_name(&self, node_name: &str) -> bool {
65 self.nodes
66 .iter()
67 .find(|(_, info)| info.node_name == node_name)
68 .is_some()
69 }
70
71 fn get_or_add_schedule_crumbs(&mut self, schedule_name: &str) -> SharedNodeCrumbs {
72 match self.crumbs.get(schedule_name) {
73 Some(crumbs) => crumbs.clone(),
74 None => {
75 let crumbs = SharedNodeCrumbs::new();
76 self.crumbs
77 .insert(String::from(schedule_name), crumbs.clone());
78 crumbs
79 }
80 }
81 }
82
83 fn insert_node(&mut self, info: Arc<NodeInfo>) -> NodeId {
84 let idx = self.nodes.insert(info);
85 NodeId(idx)
86 }
87
88 fn insert_schedule(&mut self, info: Arc<ScheduleInfo>) -> ScheduleId {
89 let idx = self.schedules.insert(info);
90 ScheduleId(idx)
91 }
92
93 pub fn check_for_stalled_schedules(&self) {
94 let now = self.clocks.app_mono.now();
95 for (_, crumbs) in self.crumbs.iter() {
96 if let Some(crumbs) = crumbs.read() {
97 let duration = now.saturating_sub(*crumbs.time).as_secs_f64();
98 if duration > 1.0 {
99 let name = &self.nodes[crumbs.node_id.0].node_name;
100 log::warn!(
101 "execution stalled: name={}, transition={:?}, duration={}",
102 name,
103 crumbs.transition,
104 duration,
105 );
106 }
107 }
108 }
109 }
110}
111
112#[derive(Default, Clone, Serialize, Deserialize)]
114pub struct NodeInfo {
115 pub schedule_name: String,
116 pub sequence_name: String,
117 pub node_name: String,
118 pub typename: String,
119 pub rx_names: Vec<String>,
120 pub tx_names: Vec<String>,
121 pub signal_names: Vec<String>,
122}
123
124impl NodeInfo {
125 pub fn rx_index_by_name(&self, needle: &str) -> Option<usize> {
127 self.rx_names.iter().position(|name| name == needle)
128 }
129
130 pub fn tx_index_by_name(&self, needle: &str) -> Option<usize> {
132 self.tx_names.iter().position(|name| name == needle)
133 }
134
135 pub fn signal_index_by_name(&self, needle: &str) -> Option<usize> {
137 self.signal_names.iter().position(|name| name == needle)
138 }
139}
140
141#[derive(Default, Clone, Serialize, Deserialize)]
142pub struct ScheduleInfo {
143 pub name: String,
144 pub period: Option<Duration>,
145}
146
147pub struct AppSetupScheduleContext<'a> {
148 app: &'a mut App,
149 id: ScheduleId,
150 schedule_name: String,
151 sequence_name: Option<String>,
152 crumbs: SharedNodeCrumbs,
153}
154
155impl<'a> AppSetupScheduleContext<'a> {
156 pub fn new(app: &'a mut App, builder: &ScheduleBuilder) -> Self {
157 let crumbs = app.get_or_add_schedule_crumbs(&builder.name);
158
159 let id = app.insert_schedule(Arc::new(ScheduleInfo {
160 name: builder.name.clone(),
161 period: builder.period,
162 }));
163
164 app.schedule_monitor.insert(id);
165
166 Self {
167 app,
168 id,
169 schedule_name: builder.name.clone(),
170 sequence_name: None,
171 crumbs,
172 }
173 }
174
175 pub fn id(&self) -> ScheduleId {
176 self.id
177 }
178
179 pub fn on_begin_sequence(&mut self, sequence_name: String) {
180 self.sequence_name = Some(sequence_name);
181 }
182
183 pub fn on_end_sequence(&mut self) {
184 self.sequence_name = None;
185 }
186
187 pub fn on_node(&mut self, vise: &mut Vise) {
188 let node_name = vise.name().to_string();
189
190 if self.app.has_node_with_name(&node_name) {
191 log::warn!("duplicate node name: {node_name}");
192 }
193
194 let info = Arc::new(NodeInfo {
195 schedule_name: self.schedule_name.clone(),
196 sequence_name: self
197 .sequence_name
198 .clone()
199 .expect("internal error: on_begin_sequence must be called before on_node"),
200 node_name,
201 typename: vise.type_name().to_string(),
202 rx_names: vise
203 .rx_names()
204 .into_iter()
205 .map(ToString::to_string)
206 .collect(),
207 tx_names: vise
208 .tx_names()
209 .into_iter()
210 .map(ToString::to_string)
211 .collect(),
212 signal_names: vise
213 .signal_names()
214 .into_iter()
215 .map(ToString::to_string)
216 .collect(),
217 });
218
219 let node_id = self.app.insert_node(info.clone());
220
221 vise.setup(NodeletSetup {
222 info,
223 clocks: self.app.clocks.clone(),
224 node_id,
225 monitor: self.app.nodelet_monitor.clone(),
226 crumbs: self.crumbs.clone(),
227 });
228 }
229
230 pub fn monitor(&self) -> SharedAppMonitor {
231 self.app.nodelet_monitor.clone()
232 }
233}
234
235#[derive(Clone)]
236pub struct SharedScheduleMonitor(Arc<RwLock<ScheduleMonitor>>);
237
238impl SharedScheduleMonitor {
239 pub fn new() -> Self {
240 Self(Arc::new(RwLock::new(ScheduleMonitor::default())))
241 }
242
243 pub fn insert(&self, id: ScheduleId) {
244 let mut m = self.0.write().unwrap();
245 m.schedules
246 .insert(id.into(), ScheduleMonitorEntry::default());
247 }
248
249 pub fn edit<F>(&self, id: ScheduleId, f: F) -> eyre::Result<()>
250 where
251 F: Fn(&mut ScheduleMonitorEntry),
252 {
253 let mut m = self.0.write().map_err(|_| eyre!("lock poisoned"))?;
254 let m = m
255 .get_mut(id)
256 .ok_or_else(|| eyre!("invalid schedule ID: {id:?}"))?;
257 f(m);
258 Ok(())
259 }
260
261 pub fn to_data(&self) -> eyre::Result<ScheduleMonitor> {
262 Ok(self.0.read().map_err(|_| eyre!("lock poisoned"))?.clone())
263 }
264}
265
266#[derive(Default, Clone)]
267pub struct ScheduleMonitor {
268 schedules: OptVec<ScheduleMonitorEntry>,
269}
270
271impl ScheduleMonitor {
272 pub fn iter(&self) -> impl Iterator<Item = (ScheduleId, &ScheduleMonitorEntry)> {
273 self.schedules.iter().map(|(k, v)| (ScheduleId(k), v))
274 }
275
276 pub fn get(&self, id: ScheduleId) -> Option<&ScheduleMonitorEntry> {
277 self.schedules.get(id.into())
278 }
279
280 pub fn get_mut(&mut self, id: ScheduleId) -> Option<&mut ScheduleMonitorEntry> {
281 self.schedules.get_mut(id.into())
282 }
283}
284
285#[derive(Default, Clone)]
286pub struct ScheduleMonitorEntry {
287 pub lifecycle_status: LifecycleStatus,
288 pub last_period: Option<Duration>,
289 pub last_error: Option<String>,
290 pub has_panicked: bool,
291 pub has_finished: bool,
292}