nodo/codelet/
codelet_instance.rs

1// Copyright 2023 David Weikersdorfer
2
3use crate::{
4    channels::{FlushResult, RxBundle, SyncResult, TxBundle},
5    codelet::{
6        Codelet, CodeletPulse, CodeletStatus, Context, Lifecycle, LifecycleStatus, Statistics,
7        TaskClocks, Transition,
8    },
9    config::{Config, ConfigAux},
10    core::*,
11    monitors::SharedAppMonitor,
12    signals::Signals,
13};
14use eyre::{eyre, Result};
15use serde::{Deserialize, Serialize};
16use std::sync::{Arc, RwLock};
17
18/// Unique identifier of a node across the app
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct NodeId(pub usize);
21
22#[derive(Clone)]
23pub struct SharedNodeCrumbs {
24    inner: Arc<RwLock<Option<NodeCrumbs>>>,
25}
26
27impl SharedNodeCrumbs {
28    pub fn new() -> Self {
29        Self {
30            inner: Arc::new(RwLock::new(None)),
31        }
32    }
33
34    pub fn on_begin(&self, time: Pubtime, node_id: NodeId, transition: Transition) {
35        // SAFETY: cannot panic during write
36        *self.inner.write().unwrap() = Some(NodeCrumbs {
37            time,
38            node_id,
39            transition,
40        });
41    }
42
43    pub fn on_end(&self) {
44        // SAFETY: cannot panic during write
45        *self.inner.write().unwrap() = None;
46    }
47
48    pub fn read(&self) -> Option<NodeCrumbs> {
49        // SAFETY: cannot panic during write
50        self.inner.read().unwrap().clone()
51    }
52}
53
54#[derive(Clone)]
55pub struct NodeCrumbs {
56    pub time: Pubtime,
57    pub node_id: NodeId,
58    pub transition: Transition,
59}
60
61/// Named instance of a codelet with configuration and channel bundels
62pub struct CodeletInstance<C: Codelet> {
63    pub id: Option<NodeId>,
64    pub crumbs: Option<SharedNodeCrumbs>,
65
66    pub name: String,
67    pub state: C,
68    pub config: C::Config,
69    pub config_aux: <C::Config as Config>::Aux,
70    pub rx: C::Rx,
71    pub tx: C::Tx,
72
73    pub(crate) clocks: Option<TaskClocks>,
74    pub(crate) pulse: CodeletPulse,
75    pub(crate) is_scheduled: bool,
76    pub(crate) rx_sync_results: Vec<SyncResult>,
77    pub(crate) tx_flush_results: Vec<FlushResult>,
78
79    pub(crate) lifecycle_status: LifecycleStatus,
80    pub(crate) status: Option<C::Status>,
81
82    pub(crate) statistics: Statistics,
83
84    pub(crate) signals: C::Signals,
85
86    pub(crate) monitor: Option<SharedAppMonitor>,
87}
88
89impl<C: Codelet> Drop for CodeletInstance<C> {
90    fn drop(&mut self) {
91        if !self.is_scheduled {
92            log::warn!(
93                "Codelet instance `{}` was created and destroyed without every being scheduled",
94                self.name
95            );
96        }
97    }
98}
99
100impl<C: Codelet> CodeletInstance<C> {
101    /// Creates a new instance with given state and config
102    pub(crate) fn new<S: Into<String>>(name: S, state: C, config: C::Config) -> Self {
103        let (rx, tx) = C::build_bundles(&config);
104        let rx_count = rx.channel_count();
105        let tx_count = tx.channel_count();
106        Self {
107            id: None,
108            crumbs: None,
109            name: name.into(),
110            state,
111            config,
112            config_aux: <C::Config as Config>::Aux::default(),
113            clocks: None,
114            pulse: CodeletPulse::new(),
115            is_scheduled: false,
116            rx_sync_results: vec![SyncResult::ZERO; rx_count],
117            tx_flush_results: vec![FlushResult::ZERO; tx_count],
118            lifecycle_status: LifecycleStatus::Inactive,
119            status: None,
120            statistics: Statistics {
121                rx_available_messages_count: vec![0; rx_count],
122                tx_published_message_count: vec![0; tx_count],
123                tx_last_pubtime: vec![None; tx_count],
124                ..Default::default()
125            },
126            signals: Default::default(),
127            monitor: None,
128            rx,
129            tx,
130        }
131    }
132
133    pub fn type_name(&self) -> &str {
134        std::any::type_name::<C>()
135    }
136
137    pub fn modify_state_with<F>(mut self, f: F) -> Self
138    where
139        F: Fn(&mut C) -> (),
140    {
141        f(&mut self.state);
142        self
143    }
144
145    pub fn start(&mut self) -> Result<C::Status> {
146        profiling::scope!(&format!("{}_start", self.name));
147        log::trace!("'{}' start begin", self.name);
148
149        self.on_pre_start()?;
150
151        let status = self.state.start(
152            Context {
153                clocks: &self.clocks.as_ref().unwrap(),
154                config: &self.config,
155                config_aux: &self.config_aux,
156                pulse: &self.pulse,
157                signals: &mut self.signals,
158            },
159            &mut self.rx,
160            &mut self.tx,
161        )?;
162
163        self.on_post_start()?;
164
165        log::trace!("'{}' start end ({})", self.name, status.label());
166        Ok(status)
167    }
168
169    pub fn stop(&mut self) -> Result<C::Status> {
170        profiling::scope!(&format!("{}_stop", self.name));
171        log::trace!("'{}' stop begin", self.name);
172
173        self.on_pre_stop()?;
174
175        let status = self.state.stop(
176            Context {
177                clocks: &self.clocks.as_ref().unwrap(),
178                config: &self.config,
179                config_aux: &self.config_aux,
180                pulse: &self.pulse,
181                signals: &mut self.signals,
182            },
183            &mut self.rx,
184            &mut self.tx,
185        )?;
186
187        self.on_post_stop()?;
188
189        log::trace!("'{}' stop end ({})", self.name, status.label());
190        Ok(status)
191    }
192
193    pub fn step(&mut self) -> Result<C::Status> {
194        profiling::scope!(&format!("{}_step", self.name));
195        log::trace!("'{}' step begin", self.name);
196
197        self.on_pre_step()?;
198
199        let status = self.state.step(
200            Context {
201                clocks: &self.clocks.as_ref().unwrap(),
202                config: &self.config,
203                config_aux: &self.config_aux,
204                pulse: &self.pulse,
205                signals: &mut self.signals,
206            },
207            &mut self.rx,
208            &mut self.tx,
209        )?;
210
211        self.on_post_step()?;
212
213        log::trace!("'{}' step end ({})", self.name, status.label());
214        Ok(status)
215    }
216
217    pub fn pause(&mut self) -> Result<C::Status> {
218        self.state.pause()
219    }
220
221    pub fn resume(&mut self) -> Result<C::Status> {
222        self.state.resume()
223    }
224
225    fn on_pre_start(&mut self) -> Result<()> {
226        self.lifecycle_status = LifecycleStatus::Starting;
227
228        let cc = self.rx.check_connection();
229        if !cc.is_fully_connected() {
230            log::warn!(
231                "codelet '{}' (type={}) has unconnected RX channels: {}",
232                self.name,
233                self.type_name(),
234                cc.list_unconnected()
235                    .iter()
236                    .map(|&i| format!("[{i}] {}", self.rx.name(i)))
237                    .collect::<Vec<String>>()
238                    .join(", ")
239            );
240        }
241
242        let cc = self.tx.check_connection();
243        if !cc.is_fully_connected() {
244            log::warn!(
245                "codelet '{}' (type={}) has unconnected TX channels: {}",
246                self.name,
247                self.type_name(),
248                cc.list_unconnected()
249                    .iter()
250                    .map(|&i| format!("[{i}] {}", self.tx.name(i)))
251                    .collect::<Vec<String>>()
252                    .join(", ")
253            );
254        }
255
256        self.initialize_statistics();
257
258        self.sync()?;
259
260        self.clocks.as_mut().unwrap().on_codelet_start();
261
262        self.update_statistics_rx_available_message_counts();
263
264        Ok(())
265    }
266
267    fn on_post_start(&mut self) -> Result<()> {
268        self.update_statistics_tx_published_message_count();
269
270        self.flush()?;
271
272        self.update_monitor()?;
273
274        self.lifecycle_status = LifecycleStatus::Running;
275
276        Ok(())
277    }
278
279    fn on_pre_stop(&mut self) -> Result<()> {
280        self.lifecycle_status = LifecycleStatus::Stopping;
281
282        self.sync()?;
283
284        self.clocks.as_mut().unwrap().on_codelet_stop();
285
286        self.update_statistics_rx_available_message_counts();
287
288        Ok(())
289    }
290
291    fn on_post_stop(&mut self) -> Result<()> {
292        self.update_statistics_tx_published_message_count();
293
294        self.flush()?;
295
296        self.update_monitor()?;
297
298        self.lifecycle_status = LifecycleStatus::Inactive;
299
300        Ok(())
301    }
302
303    fn on_pre_step(&mut self) -> Result<()> {
304        self.sync()?;
305
306        self.clocks.as_mut().unwrap().on_codelet_step();
307
308        self.update_statistics_rx_available_message_counts();
309
310        Ok(())
311    }
312
313    fn on_post_step(&mut self) -> Result<()> {
314        self.update_statistics_tx_published_message_count();
315
316        self.pulse.on_step_post();
317
318        self.config_aux.on_post_step();
319
320        self.flush()?;
321
322        self.update_monitor()?;
323
324        Ok(())
325    }
326
327    fn sync(&mut self) -> Result<()> {
328        // For some codelets the TX channel count might change dynamically
329        self.rx_sync_results
330            .resize(self.rx.channel_count(), SyncResult::ZERO);
331
332        self.rx.sync_all(self.rx_sync_results.as_mut_slice());
333
334        for result in self.rx_sync_results.iter() {
335            if result.enforce_empty_violation {
336                return Err(eyre!("'{}': sync error (EnforceEmpty violated)", self.name,));
337            }
338        }
339
340        Ok(())
341    }
342
343    fn flush(&mut self) -> Result<()> {
344        // For some codelets the TX channel count might change dynamically
345        self.tx_flush_results
346            .resize(self.tx.channel_count(), FlushResult::ZERO);
347
348        self.tx.flush_all(self.tx_flush_results.as_mut_slice());
349
350        for result in self.tx_flush_results.iter() {
351            if result.error_indicator.is_err() {
352                return Err(eyre!(
353                    "'{}': flush error {}",
354                    self.name,
355                    result.error_indicator
356                ));
357            }
358        }
359
360        Ok(())
361    }
362
363    fn update_monitor(&mut self) -> Result<()> {
364        let step_time = self.clocks.as_ref().unwrap().codelet.step_time();
365        self.signals.on_post_execute(step_time);
366
367        self.monitor
368            .as_ref()
369            .unwrap()
370            .update_node(self.clocks.as_ref().unwrap().app_mono.now(), self)?;
371
372        Ok(())
373    }
374
375    fn initialize_statistics(&mut self) {
376        self.statistics
377            .rx_available_messages_count
378            .resize(self.rx.channel_count(), 0);
379
380        self.statistics
381            .tx_published_message_count
382            .resize(self.tx.channel_count(), 0);
383
384        self.statistics
385            .tx_last_pubtime
386            .resize(self.tx.channel_count(), None);
387    }
388
389    fn update_statistics_rx_available_message_counts(&mut self) {
390        for index in 0..self.rx.channel_count() {
391            self.statistics.rx_available_messages_count[index] = self.rx.inbox_message_count(index);
392        }
393    }
394
395    fn update_statistics_tx_published_message_count(&mut self) {
396        // TODO this is not the pubtime of the message ..
397        let codelet_step_time = self.clocks.as_ref().unwrap().app_mono.now();
398
399        for index in 0..self.tx.channel_count() {
400            let n = self.tx.outbox_message_count(index);
401            self.statistics.tx_published_message_count[index] += n;
402            if n > 0 {
403                self.statistics.tx_last_pubtime[index] = Some(codelet_step_time);
404            }
405        }
406    }
407
408    fn on_cycle_begin(&mut self, transition: Transition) {
409        let now = self
410            .clocks
411            .as_ref()
412            .expect("internal error: clocks must be set")
413            .app_mono
414            .now();
415
416        self.crumbs
417            .as_ref()
418            .expect("internal error: crumbs must be set")
419            .on_begin(
420                now,
421                self.id.expect("internal error: id must be set"),
422                transition,
423            );
424
425        self.statistics.transitions[transition].begin(now);
426    }
427
428    fn on_cycle_end(&mut self, transition: Transition, skipped: bool) {
429        let now = self
430            .clocks
431            .as_ref()
432            .expect("internal error: clocks must be set")
433            .app_mono
434            .now();
435
436        self.statistics.transitions[transition].end(now, skipped);
437
438        self.crumbs
439            .as_ref()
440            .expect("internal error: crumbs must be set")
441            .on_end();
442    }
443}
444
445impl<C: Codelet> Lifecycle for CodeletInstance<C> {
446    fn cycle(&mut self, transition: Transition) -> Result<DefaultStatus> {
447        self.on_cycle_begin(transition);
448
449        let status = match transition {
450            Transition::Start => self.start(),
451            Transition::Step => self.step(),
452            Transition::Stop => self.stop(),
453            Transition::Pause => self.pause(),
454            Transition::Resume => self.resume(),
455        }?;
456        let simplified_status = status.as_default_status();
457        self.status = Some(status);
458
459        let skipped = simplified_status == OutcomeKind::Skipped;
460        self.on_cycle_end(transition, skipped);
461
462        Ok(simplified_status)
463    }
464}