use crate::{
channels::{FlushResult, RxBundle, SyncResult, TxBundle},
codelet::{
Codelet, CodeletPulse, CodeletStatus, Context, Lifecycle, LifecycleStatus, Statistics,
TaskClocks, Transition,
},
config::{Config, ConfigAux},
core::*,
monitors::SharedAppMonitor,
signals::Signals,
};
use eyre::{eyre, Result};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NodeId(pub usize);
#[derive(Clone)]
pub struct SharedNodeCrumbs {
inner: Arc<RwLock<Option<NodeCrumbs>>>,
}
impl SharedNodeCrumbs {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(None)),
}
}
pub fn on_begin(&self, time: Pubtime, node_id: NodeId, transition: Transition) {
*self.inner.write().unwrap() = Some(NodeCrumbs {
time,
node_id,
transition,
});
}
pub fn on_end(&self) {
*self.inner.write().unwrap() = None;
}
pub fn read(&self) -> Option<NodeCrumbs> {
self.inner.read().unwrap().clone()
}
}
#[derive(Clone)]
pub struct NodeCrumbs {
pub time: Pubtime,
pub node_id: NodeId,
pub transition: Transition,
}
pub struct CodeletInstance<C: Codelet> {
pub id: Option<NodeId>,
pub crumbs: Option<SharedNodeCrumbs>,
pub name: String,
pub state: C,
pub config: C::Config,
pub config_aux: <C::Config as Config>::Aux,
pub rx: C::Rx,
pub tx: C::Tx,
pub(crate) clocks: Option<TaskClocks>,
pub(crate) pulse: CodeletPulse,
pub(crate) is_scheduled: bool,
pub(crate) rx_sync_results: Vec<SyncResult>,
pub(crate) tx_flush_results: Vec<FlushResult>,
pub(crate) lifecycle_status: LifecycleStatus,
pub(crate) status: Option<C::Status>,
pub(crate) statistics: Statistics,
pub(crate) signals: C::Signals,
pub(crate) monitor: Option<SharedAppMonitor>,
}
impl<C: Codelet> Drop for CodeletInstance<C> {
fn drop(&mut self) {
if !self.is_scheduled {
log::warn!(
"Codelet instance `{}` was created and destroyed without every being scheduled",
self.name
);
}
}
}
impl<C: Codelet> CodeletInstance<C> {
pub(crate) fn new<S: Into<String>>(name: S, state: C, config: C::Config) -> Self {
let (rx, tx) = C::build_bundles(&config);
let rx_count = rx.channel_count();
let tx_count = tx.channel_count();
Self {
id: None,
crumbs: None,
name: name.into(),
state,
config,
config_aux: <C::Config as Config>::Aux::default(),
clocks: None,
pulse: CodeletPulse::new(),
is_scheduled: false,
rx_sync_results: vec![SyncResult::ZERO; rx_count],
tx_flush_results: vec![FlushResult::ZERO; tx_count],
lifecycle_status: LifecycleStatus::Inactive,
status: None,
statistics: Statistics {
rx_available_messages_count: vec![0; rx_count],
tx_published_message_count: vec![0; tx_count],
tx_last_pubtime: vec![None; tx_count],
..Default::default()
},
signals: Default::default(),
monitor: None,
rx,
tx,
}
}
pub fn type_name(&self) -> &str {
std::any::type_name::<C>()
}
pub fn modify_state_with<F>(mut self, f: F) -> Self
where
F: Fn(&mut C) -> (),
{
f(&mut self.state);
self
}
pub fn start(&mut self) -> Result<C::Status> {
profiling::scope!(&format!("{}_start", self.name));
log::trace!("'{}' start begin", self.name);
self.on_pre_start()?;
let status = self.state.start(
Context {
clocks: &self.clocks.as_ref().unwrap(),
config: &self.config,
config_aux: &self.config_aux,
pulse: &self.pulse,
signals: &mut self.signals,
},
&mut self.rx,
&mut self.tx,
)?;
self.on_post_start()?;
log::trace!("'{}' start end ({})", self.name, status.label());
Ok(status)
}
pub fn stop(&mut self) -> Result<C::Status> {
profiling::scope!(&format!("{}_stop", self.name));
log::trace!("'{}' stop begin", self.name);
self.on_pre_stop()?;
let status = self.state.stop(
Context {
clocks: &self.clocks.as_ref().unwrap(),
config: &self.config,
config_aux: &self.config_aux,
pulse: &self.pulse,
signals: &mut self.signals,
},
&mut self.rx,
&mut self.tx,
)?;
self.on_post_stop()?;
log::trace!("'{}' stop end ({})", self.name, status.label());
Ok(status)
}
pub fn step(&mut self) -> Result<C::Status> {
profiling::scope!(&format!("{}_step", self.name));
log::trace!("'{}' step begin", self.name);
self.on_pre_step()?;
let status = self.state.step(
Context {
clocks: &self.clocks.as_ref().unwrap(),
config: &self.config,
config_aux: &self.config_aux,
pulse: &self.pulse,
signals: &mut self.signals,
},
&mut self.rx,
&mut self.tx,
)?;
self.on_post_step()?;
log::trace!("'{}' step end ({})", self.name, status.label());
Ok(status)
}
pub fn pause(&mut self) -> Result<C::Status> {
self.state.pause()
}
pub fn resume(&mut self) -> Result<C::Status> {
self.state.resume()
}
fn on_pre_start(&mut self) -> Result<()> {
self.lifecycle_status = LifecycleStatus::Starting;
let cc = self.rx.check_connection();
if !cc.is_fully_connected() {
log::warn!(
"codelet '{}' (type={}) has unconnected RX channels: {}",
self.name,
self.type_name(),
cc.list_unconnected()
.iter()
.map(|&i| format!("[{i}] {}", self.rx.name(i)))
.collect::<Vec<String>>()
.join(", ")
);
}
let cc = self.tx.check_connection();
if !cc.is_fully_connected() {
log::warn!(
"codelet '{}' (type={}) has unconnected TX channels: {}",
self.name,
self.type_name(),
cc.list_unconnected()
.iter()
.map(|&i| format!("[{i}] {}", self.tx.name(i)))
.collect::<Vec<String>>()
.join(", ")
);
}
self.initialize_statistics();
self.sync()?;
self.clocks.as_mut().unwrap().on_codelet_start();
self.update_statistics_rx_available_message_counts();
Ok(())
}
fn on_post_start(&mut self) -> Result<()> {
self.update_statistics_tx_published_message_count();
self.flush()?;
self.update_monitor()?;
self.lifecycle_status = LifecycleStatus::Running;
Ok(())
}
fn on_pre_stop(&mut self) -> Result<()> {
self.lifecycle_status = LifecycleStatus::Stopping;
self.sync()?;
self.clocks.as_mut().unwrap().on_codelet_stop();
self.update_statistics_rx_available_message_counts();
Ok(())
}
fn on_post_stop(&mut self) -> Result<()> {
self.update_statistics_tx_published_message_count();
self.flush()?;
self.update_monitor()?;
self.lifecycle_status = LifecycleStatus::Inactive;
Ok(())
}
fn on_pre_step(&mut self) -> Result<()> {
self.sync()?;
self.clocks.as_mut().unwrap().on_codelet_step();
self.update_statistics_rx_available_message_counts();
Ok(())
}
fn on_post_step(&mut self) -> Result<()> {
self.update_statistics_tx_published_message_count();
self.pulse.on_step_post();
self.config_aux.on_post_step();
self.flush()?;
self.update_monitor()?;
Ok(())
}
fn sync(&mut self) -> Result<()> {
self.rx_sync_results
.resize(self.rx.channel_count(), SyncResult::ZERO);
self.rx.sync_all(self.rx_sync_results.as_mut_slice());
for result in self.rx_sync_results.iter() {
if result.enforce_empty_violation {
return Err(eyre!("'{}': sync error (EnforceEmpty violated)", self.name,));
}
}
Ok(())
}
fn flush(&mut self) -> Result<()> {
self.tx_flush_results
.resize(self.tx.channel_count(), FlushResult::ZERO);
self.tx.flush_all(self.tx_flush_results.as_mut_slice());
for result in self.tx_flush_results.iter() {
if result.error_indicator.is_err() {
return Err(eyre!(
"'{}': flush error {}",
self.name,
result.error_indicator
));
}
}
Ok(())
}
fn update_monitor(&mut self) -> Result<()> {
let step_time = self.clocks.as_ref().unwrap().codelet.step_time();
self.signals.on_post_execute(step_time);
self.monitor
.as_ref()
.unwrap()
.update_node(self.clocks.as_ref().unwrap().app_mono.now(), self)?;
Ok(())
}
fn initialize_statistics(&mut self) {
self.statistics
.rx_available_messages_count
.resize(self.rx.channel_count(), 0);
self.statistics
.tx_published_message_count
.resize(self.tx.channel_count(), 0);
self.statistics
.tx_last_pubtime
.resize(self.tx.channel_count(), None);
}
fn update_statistics_rx_available_message_counts(&mut self) {
for index in 0..self.rx.channel_count() {
self.statistics.rx_available_messages_count[index] = self.rx.inbox_message_count(index);
}
}
fn update_statistics_tx_published_message_count(&mut self) {
let codelet_step_time = self.clocks.as_ref().unwrap().app_mono.now();
for index in 0..self.tx.channel_count() {
let n = self.tx.outbox_message_count(index);
self.statistics.tx_published_message_count[index] += n;
if n > 0 {
self.statistics.tx_last_pubtime[index] = Some(codelet_step_time);
}
}
}
fn on_cycle_begin(&mut self, transition: Transition) {
let now = self
.clocks
.as_ref()
.expect("internal error: clocks must be set")
.app_mono
.now();
self.crumbs
.as_ref()
.expect("internal error: crumbs must be set")
.on_begin(
now,
self.id.expect("internal error: id must be set"),
transition,
);
self.statistics.transitions[transition].begin(now);
}
fn on_cycle_end(&mut self, transition: Transition, skipped: bool) {
let now = self
.clocks
.as_ref()
.expect("internal error: clocks must be set")
.app_mono
.now();
self.statistics.transitions[transition].end(now, skipped);
self.crumbs
.as_ref()
.expect("internal error: crumbs must be set")
.on_end();
}
}
impl<C: Codelet> Lifecycle for CodeletInstance<C> {
fn cycle(&mut self, transition: Transition) -> Result<DefaultStatus> {
self.on_cycle_begin(transition);
let status = match transition {
Transition::Start => self.start(),
Transition::Step => self.step(),
Transition::Stop => self.stop(),
Transition::Pause => self.pause(),
Transition::Resume => self.resume(),
}?;
let simplified_status = status.as_default_status();
self.status = Some(status);
let skipped = simplified_status == OutcomeKind::Skipped;
self.on_cycle_end(transition, skipped);
Ok(simplified_status)
}
}