1use crate::{
4 channels::{FlushResult, RxBundle, SyncResult, TxBundle},
5 codelet::{
6 Codelet, CodeletPulse, CodeletStatus, Context, Lifecycle, LifecycleStatus, Statistics,
7 TaskClocks, Transition,
8 },
9 config::{Config, ConfigAux},
10 core::*,
11 monitors::SharedAppMonitor,
12 signals::Signals,
13};
14use eyre::{eyre, Result};
15use serde::{Deserialize, Serialize};
16use std::sync::{Arc, RwLock};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct NodeId(pub usize);
21
22#[derive(Clone)]
23pub struct SharedNodeCrumbs {
24 inner: Arc<RwLock<Option<NodeCrumbs>>>,
25}
26
27impl SharedNodeCrumbs {
28 pub fn new() -> Self {
29 Self {
30 inner: Arc::new(RwLock::new(None)),
31 }
32 }
33
34 pub fn on_begin(&self, time: Pubtime, node_id: NodeId, transition: Transition) {
35 *self.inner.write().unwrap() = Some(NodeCrumbs {
37 time,
38 node_id,
39 transition,
40 });
41 }
42
43 pub fn on_end(&self) {
44 *self.inner.write().unwrap() = None;
46 }
47
48 pub fn read(&self) -> Option<NodeCrumbs> {
49 self.inner.read().unwrap().clone()
51 }
52}
53
54#[derive(Clone)]
55pub struct NodeCrumbs {
56 pub time: Pubtime,
57 pub node_id: NodeId,
58 pub transition: Transition,
59}
60
61pub struct CodeletInstance<C: Codelet> {
63 pub id: Option<NodeId>,
64 pub crumbs: Option<SharedNodeCrumbs>,
65
66 pub name: String,
67 pub state: C,
68 pub config: C::Config,
69 pub config_aux: <C::Config as Config>::Aux,
70 pub rx: C::Rx,
71 pub tx: C::Tx,
72
73 pub(crate) clocks: Option<TaskClocks>,
74 pub(crate) pulse: CodeletPulse,
75 pub(crate) is_scheduled: bool,
76 pub(crate) rx_sync_results: Vec<SyncResult>,
77 pub(crate) tx_flush_results: Vec<FlushResult>,
78
79 pub(crate) lifecycle_status: LifecycleStatus,
80 pub(crate) status: Option<C::Status>,
81
82 pub(crate) statistics: Statistics,
83
84 pub(crate) signals: C::Signals,
85
86 pub(crate) monitor: Option<SharedAppMonitor>,
87}
88
89impl<C: Codelet> Drop for CodeletInstance<C> {
90 fn drop(&mut self) {
91 if !self.is_scheduled {
92 log::warn!(
93 "Codelet instance `{}` was created and destroyed without every being scheduled",
94 self.name
95 );
96 }
97 }
98}
99
100impl<C: Codelet> CodeletInstance<C> {
101 pub(crate) fn new<S: Into<String>>(name: S, state: C, config: C::Config) -> Self {
103 let (rx, tx) = C::build_bundles(&config);
104 let rx_count = rx.channel_count();
105 let tx_count = tx.channel_count();
106 Self {
107 id: None,
108 crumbs: None,
109 name: name.into(),
110 state,
111 config,
112 config_aux: <C::Config as Config>::Aux::default(),
113 clocks: None,
114 pulse: CodeletPulse::new(),
115 is_scheduled: false,
116 rx_sync_results: vec![SyncResult::ZERO; rx_count],
117 tx_flush_results: vec![FlushResult::ZERO; tx_count],
118 lifecycle_status: LifecycleStatus::Inactive,
119 status: None,
120 statistics: Statistics {
121 rx_available_messages_count: vec![0; rx_count],
122 tx_published_message_count: vec![0; tx_count],
123 tx_last_pubtime: vec![None; tx_count],
124 ..Default::default()
125 },
126 signals: Default::default(),
127 monitor: None,
128 rx,
129 tx,
130 }
131 }
132
133 pub fn type_name(&self) -> &str {
134 std::any::type_name::<C>()
135 }
136
137 pub fn modify_state_with<F>(mut self, f: F) -> Self
138 where
139 F: Fn(&mut C) -> (),
140 {
141 f(&mut self.state);
142 self
143 }
144
145 pub fn start(&mut self) -> Result<C::Status> {
146 profiling::scope!(&format!("{}_start", self.name));
147 log::trace!("'{}' start begin", self.name);
148
149 self.on_pre_start()?;
150
151 let status = self.state.start(
152 Context {
153 clocks: &self.clocks.as_ref().unwrap(),
154 config: &self.config,
155 config_aux: &self.config_aux,
156 pulse: &self.pulse,
157 signals: &mut self.signals,
158 },
159 &mut self.rx,
160 &mut self.tx,
161 )?;
162
163 self.on_post_start()?;
164
165 log::trace!("'{}' start end ({})", self.name, status.label());
166 Ok(status)
167 }
168
169 pub fn stop(&mut self) -> Result<C::Status> {
170 profiling::scope!(&format!("{}_stop", self.name));
171 log::trace!("'{}' stop begin", self.name);
172
173 self.on_pre_stop()?;
174
175 let status = self.state.stop(
176 Context {
177 clocks: &self.clocks.as_ref().unwrap(),
178 config: &self.config,
179 config_aux: &self.config_aux,
180 pulse: &self.pulse,
181 signals: &mut self.signals,
182 },
183 &mut self.rx,
184 &mut self.tx,
185 )?;
186
187 self.on_post_stop()?;
188
189 log::trace!("'{}' stop end ({})", self.name, status.label());
190 Ok(status)
191 }
192
193 pub fn step(&mut self) -> Result<C::Status> {
194 profiling::scope!(&format!("{}_step", self.name));
195 log::trace!("'{}' step begin", self.name);
196
197 self.on_pre_step()?;
198
199 let status = self.state.step(
200 Context {
201 clocks: &self.clocks.as_ref().unwrap(),
202 config: &self.config,
203 config_aux: &self.config_aux,
204 pulse: &self.pulse,
205 signals: &mut self.signals,
206 },
207 &mut self.rx,
208 &mut self.tx,
209 )?;
210
211 self.on_post_step()?;
212
213 log::trace!("'{}' step end ({})", self.name, status.label());
214 Ok(status)
215 }
216
217 pub fn pause(&mut self) -> Result<C::Status> {
218 self.state.pause()
219 }
220
221 pub fn resume(&mut self) -> Result<C::Status> {
222 self.state.resume()
223 }
224
225 fn on_pre_start(&mut self) -> Result<()> {
226 self.lifecycle_status = LifecycleStatus::Starting;
227
228 let cc = self.rx.check_connection();
229 if !cc.is_fully_connected() {
230 log::warn!(
231 "codelet '{}' (type={}) has unconnected RX channels: {}",
232 self.name,
233 self.type_name(),
234 cc.list_unconnected()
235 .iter()
236 .map(|&i| format!("[{i}] {}", self.rx.name(i)))
237 .collect::<Vec<String>>()
238 .join(", ")
239 );
240 }
241
242 let cc = self.tx.check_connection();
243 if !cc.is_fully_connected() {
244 log::warn!(
245 "codelet '{}' (type={}) has unconnected TX channels: {}",
246 self.name,
247 self.type_name(),
248 cc.list_unconnected()
249 .iter()
250 .map(|&i| format!("[{i}] {}", self.tx.name(i)))
251 .collect::<Vec<String>>()
252 .join(", ")
253 );
254 }
255
256 self.initialize_statistics();
257
258 self.sync()?;
259
260 self.clocks.as_mut().unwrap().on_codelet_start();
261
262 self.update_statistics_rx_available_message_counts();
263
264 Ok(())
265 }
266
267 fn on_post_start(&mut self) -> Result<()> {
268 self.update_statistics_tx_published_message_count();
269
270 self.flush()?;
271
272 self.update_monitor()?;
273
274 self.lifecycle_status = LifecycleStatus::Running;
275
276 Ok(())
277 }
278
279 fn on_pre_stop(&mut self) -> Result<()> {
280 self.lifecycle_status = LifecycleStatus::Stopping;
281
282 self.sync()?;
283
284 self.clocks.as_mut().unwrap().on_codelet_stop();
285
286 self.update_statistics_rx_available_message_counts();
287
288 Ok(())
289 }
290
291 fn on_post_stop(&mut self) -> Result<()> {
292 self.update_statistics_tx_published_message_count();
293
294 self.flush()?;
295
296 self.update_monitor()?;
297
298 self.lifecycle_status = LifecycleStatus::Inactive;
299
300 Ok(())
301 }
302
303 fn on_pre_step(&mut self) -> Result<()> {
304 self.sync()?;
305
306 self.clocks.as_mut().unwrap().on_codelet_step();
307
308 self.update_statistics_rx_available_message_counts();
309
310 Ok(())
311 }
312
313 fn on_post_step(&mut self) -> Result<()> {
314 self.update_statistics_tx_published_message_count();
315
316 self.pulse.on_step_post();
317
318 self.config_aux.on_post_step();
319
320 self.flush()?;
321
322 self.update_monitor()?;
323
324 Ok(())
325 }
326
327 fn sync(&mut self) -> Result<()> {
328 self.rx_sync_results
330 .resize(self.rx.channel_count(), SyncResult::ZERO);
331
332 self.rx.sync_all(self.rx_sync_results.as_mut_slice());
333
334 for result in self.rx_sync_results.iter() {
335 if result.enforce_empty_violation {
336 return Err(eyre!("'{}': sync error (EnforceEmpty violated)", self.name,));
337 }
338 }
339
340 Ok(())
341 }
342
343 fn flush(&mut self) -> Result<()> {
344 self.tx_flush_results
346 .resize(self.tx.channel_count(), FlushResult::ZERO);
347
348 self.tx.flush_all(self.tx_flush_results.as_mut_slice());
349
350 for result in self.tx_flush_results.iter() {
351 if result.error_indicator.is_err() {
352 return Err(eyre!(
353 "'{}': flush error {}",
354 self.name,
355 result.error_indicator
356 ));
357 }
358 }
359
360 Ok(())
361 }
362
363 fn update_monitor(&mut self) -> Result<()> {
364 let step_time = self.clocks.as_ref().unwrap().codelet.step_time();
365 self.signals.on_post_execute(step_time);
366
367 self.monitor
368 .as_ref()
369 .unwrap()
370 .update_node(self.clocks.as_ref().unwrap().app_mono.now(), self)?;
371
372 Ok(())
373 }
374
375 fn initialize_statistics(&mut self) {
376 self.statistics
377 .rx_available_messages_count
378 .resize(self.rx.channel_count(), 0);
379
380 self.statistics
381 .tx_published_message_count
382 .resize(self.tx.channel_count(), 0);
383
384 self.statistics
385 .tx_last_pubtime
386 .resize(self.tx.channel_count(), None);
387 }
388
389 fn update_statistics_rx_available_message_counts(&mut self) {
390 for index in 0..self.rx.channel_count() {
391 self.statistics.rx_available_messages_count[index] = self.rx.inbox_message_count(index);
392 }
393 }
394
395 fn update_statistics_tx_published_message_count(&mut self) {
396 let codelet_step_time = self.clocks.as_ref().unwrap().app_mono.now();
398
399 for index in 0..self.tx.channel_count() {
400 let n = self.tx.outbox_message_count(index);
401 self.statistics.tx_published_message_count[index] += n;
402 if n > 0 {
403 self.statistics.tx_last_pubtime[index] = Some(codelet_step_time);
404 }
405 }
406 }
407
408 fn on_cycle_begin(&mut self, transition: Transition) {
409 let now = self
410 .clocks
411 .as_ref()
412 .expect("internal error: clocks must be set")
413 .app_mono
414 .now();
415
416 self.crumbs
417 .as_ref()
418 .expect("internal error: crumbs must be set")
419 .on_begin(
420 now,
421 self.id.expect("internal error: id must be set"),
422 transition,
423 );
424
425 self.statistics.transitions[transition].begin(now);
426 }
427
428 fn on_cycle_end(&mut self, transition: Transition, skipped: bool) {
429 let now = self
430 .clocks
431 .as_ref()
432 .expect("internal error: clocks must be set")
433 .app_mono
434 .now();
435
436 self.statistics.transitions[transition].end(now, skipped);
437
438 self.crumbs
439 .as_ref()
440 .expect("internal error: crumbs must be set")
441 .on_end();
442 }
443}
444
445impl<C: Codelet> Lifecycle for CodeletInstance<C> {
446 fn cycle(&mut self, transition: Transition) -> Result<DefaultStatus> {
447 self.on_cycle_begin(transition);
448
449 let status = match transition {
450 Transition::Start => self.start(),
451 Transition::Step => self.step(),
452 Transition::Stop => self.stop(),
453 Transition::Pause => self.pause(),
454 Transition::Resume => self.resume(),
455 }?;
456 let simplified_status = status.as_default_status();
457 self.status = Some(status);
458
459 let skipped = simplified_status == OutcomeKind::Skipped;
460 self.on_cycle_end(transition, skipped);
461
462 Ok(simplified_status)
463 }
464}