use crate::{SequenceExecutor, StateMachine};
use core::time::Duration;
use eyre::Result;
use nodo::{
app::App,
codelet::{Lifecycle, LifecycleStatus, ScheduleBuilder, ScheduleId, Transition},
prelude::{OutcomeKind, ParameterSet, ParameterWithPropertiesSet, RUNNING, SKIPPED},
};
use std::time::Instant;
pub struct ScheduleExecutor {
id: ScheduleId,
name: String,
thread_id: usize,
lifecycle_status: LifecycleStatus,
sm: StateMachine<SequenceGroupExec>,
next_transition: Option<Transition>,
max_step_count: Option<usize>,
num_steps: usize,
period: Option<Duration>,
last_instant: Option<Instant>,
last_period: Option<Duration>,
pub(crate) last_error: Option<String>,
}
impl ScheduleExecutor {
pub(crate) fn from_builder<'a>(app: &mut App, builder: ScheduleBuilder) -> Self {
let mut context = app.schedule_setup_context(&builder);
Self {
id: context.id(),
name: builder.name.clone(),
thread_id: builder.thread_id,
lifecycle_status: LifecycleStatus::Starting,
sm: StateMachine::new(SequenceGroupExec::new(builder.sequences.into_iter().map(
|seq| {
SequenceExecutor::new(&mut context, builder.name.clone(), seq.name, seq.vises)
},
))),
next_transition: Some(Transition::Start),
max_step_count: builder.max_step_count,
num_steps: 0,
period: builder.period,
last_instant: None,
last_period: None,
last_error: None,
}
}
pub fn id(&self) -> ScheduleId {
self.id
}
pub fn name(&self) -> &str {
&self.name
}
pub fn thread_id(&self) -> usize {
self.thread_id
}
pub fn lifecycle_status(&self) -> LifecycleStatus {
self.lifecycle_status
}
pub fn is_terminated(&self) -> bool {
self.next_transition.is_none()
}
pub fn period(&self) -> Option<Duration> {
self.period
}
pub fn last_instant(&self) -> Option<Instant> {
self.last_instant
}
pub fn last_period(&self) -> Option<Duration> {
self.last_period
}
pub fn get_parameters_with_properties(
&self,
) -> ParameterWithPropertiesSet<String, &'static str> {
self.sm.inner().get_parameters_with_properties()
}
pub fn spin(&mut self) {
let time_begin = Instant::now();
if let Some(prev) = self.last_instant {
self.last_period = Some(time_begin - prev);
}
self.last_instant = Some(time_begin);
if self.next_transition.is_some() {
if let Some(max_step_count) = self.max_step_count {
if self.num_steps >= max_step_count {
log::debug!("reached max step count");
self.next_transition = Some(Transition::Stop);
}
}
}
if let Some(transition) = self.next_transition {
if transition == Transition::Step {
self.lifecycle_status = LifecycleStatus::Running;
self.num_steps += 1;
}
let result = self.sm.transition(transition);
match result {
Ok(OutcomeKind::Running) | Ok(OutcomeKind::Skipped) => {
self.next_transition = match transition {
Transition::Start | Transition::Step | Transition::Resume => {
Some(Transition::Step)
}
Transition::Pause | Transition::Stop => None,
};
}
Err(err) => {
self.last_error = Some(format!("{err:?}"));
log::error!("Schedule {:?} error: {err:?}", self.name);
log::warn!("Stopping schedule {:?}.", self.name);
self.next_transition = match transition {
Transition::Stop => None,
_ => Some(Transition::Stop),
};
}
}
}
if self.next_transition.is_none() || self.next_transition == Some(Transition::Stop) {
self.lifecycle_status = LifecycleStatus::Stopping;
}
}
pub fn finalize(&mut self) {
self.lifecycle_status = LifecycleStatus::Stopping;
if self.sm.is_valid_request(Transition::Stop) {
self.sm.transition(Transition::Stop).unwrap();
self.next_transition = None;
}
}
pub fn configure(&mut self, config: &ParameterSet<String, String>) {
self.sm.inner_mut().configure(config)
}
}
pub(crate) struct SequenceGroupExec {
items: Vec<SequenceExecutor>,
}
impl SequenceGroupExec {
pub fn new<I: IntoIterator<Item = SequenceExecutor>>(iter: I) -> Self {
Self {
items: iter.into_iter().collect(),
}
}
pub fn get_parameters_with_properties(
&self,
) -> ParameterWithPropertiesSet<String, &'static str> {
let mut result = ParameterWithPropertiesSet::default();
for item in self.items.iter() {
result.extend(item.get_parameters_with_properties());
}
result
}
pub fn configure(&mut self, config: &ParameterSet<String, String>) {
for item in self.items.iter_mut() {
item.configure(&config);
}
}
}
impl Lifecycle for SequenceGroupExec {
fn cycle(&mut self, transition: Transition) -> Result<OutcomeKind> {
let mut is_any_running = false;
for item in self.items.iter_mut() {
match item.cycle(transition)? {
OutcomeKind::Skipped => {}
OutcomeKind::Running => is_any_running = true,
}
}
if is_any_running {
RUNNING
} else {
SKIPPED
}
}
}