nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
// Copyright 2024 David Weikersdorfer

use crate::{SequenceExecutor, StateMachine};
use core::time::Duration;
use eyre::Result;
use nodo::{
    app::App,
    codelet::{Lifecycle, LifecycleStatus, ScheduleBuilder, ScheduleId, Transition},
    prelude::{OutcomeKind, ParameterSet, ParameterWithPropertiesSet, RUNNING, SKIPPED},
};
use std::time::Instant;

/// Executes a schedule, i.e. a sequential list of node sequences.
pub struct ScheduleExecutor {
    id: ScheduleId,
    name: String,
    thread_id: usize,
    lifecycle_status: LifecycleStatus,
    sm: StateMachine<SequenceGroupExec>,
    next_transition: Option<Transition>,
    max_step_count: Option<usize>,
    num_steps: usize,
    period: Option<Duration>,
    last_instant: Option<Instant>,
    last_period: Option<Duration>,
    pub(crate) last_error: Option<String>,
}

impl ScheduleExecutor {
    pub(crate) fn from_builder<'a>(app: &mut App, builder: ScheduleBuilder) -> Self {
        let mut context = app.schedule_setup_context(&builder);
        Self {
            id: context.id(),
            name: builder.name.clone(),
            thread_id: builder.thread_id,
            lifecycle_status: LifecycleStatus::Starting,
            sm: StateMachine::new(SequenceGroupExec::new(builder.sequences.into_iter().map(
                |seq| {
                    SequenceExecutor::new(&mut context, builder.name.clone(), seq.name, seq.vises)
                },
            ))),
            next_transition: Some(Transition::Start),
            max_step_count: builder.max_step_count,
            num_steps: 0,
            period: builder.period,
            last_instant: None,
            last_period: None,
            last_error: None,
        }
    }

    pub fn id(&self) -> ScheduleId {
        self.id
    }

    pub fn name(&self) -> &str {
        &self.name
    }

    pub fn thread_id(&self) -> usize {
        self.thread_id
    }

    pub fn lifecycle_status(&self) -> LifecycleStatus {
        self.lifecycle_status
    }

    pub fn is_terminated(&self) -> bool {
        self.next_transition.is_none()
    }

    pub fn period(&self) -> Option<Duration> {
        self.period
    }

    pub fn last_instant(&self) -> Option<Instant> {
        self.last_instant
    }

    pub fn last_period(&self) -> Option<Duration> {
        self.last_period
    }

    pub fn get_parameters_with_properties(
        &self,
    ) -> ParameterWithPropertiesSet<String, &'static str> {
        self.sm.inner().get_parameters_with_properties()
    }

    pub fn spin(&mut self) {
        let time_begin = Instant::now();

        if let Some(prev) = self.last_instant {
            self.last_period = Some(time_begin - prev);
        }

        self.last_instant = Some(time_begin);

        if self.next_transition.is_some() {
            if let Some(max_step_count) = self.max_step_count {
                if self.num_steps >= max_step_count {
                    log::debug!("reached max step count");
                    self.next_transition = Some(Transition::Stop);
                }
            }
        }

        if let Some(transition) = self.next_transition {
            if transition == Transition::Step {
                self.lifecycle_status = LifecycleStatus::Running;
                self.num_steps += 1;
            }

            let result = self.sm.transition(transition);

            match result {
                Ok(OutcomeKind::Running) | Ok(OutcomeKind::Skipped) => {
                    self.next_transition = match transition {
                        Transition::Start | Transition::Step | Transition::Resume => {
                            Some(Transition::Step)
                        }
                        Transition::Pause | Transition::Stop => None,
                    };
                }
                Err(err) => {
                    self.last_error = Some(format!("{err:?}"));

                    log::error!("Schedule {:?} error: {err:?}", self.name);
                    log::warn!("Stopping schedule {:?}.", self.name);

                    self.next_transition = match transition {
                        Transition::Stop => None,
                        _ => Some(Transition::Stop),
                    };
                }
            }
        }

        if self.next_transition.is_none() || self.next_transition == Some(Transition::Stop) {
            self.lifecycle_status = LifecycleStatus::Stopping;
        }
    }

    pub fn finalize(&mut self) {
        self.lifecycle_status = LifecycleStatus::Stopping;
        if self.sm.is_valid_request(Transition::Stop) {
            self.sm.transition(Transition::Stop).unwrap();
            self.next_transition = None;
        }
    }

    pub fn configure(&mut self, config: &ParameterSet<String, String>) {
        self.sm.inner_mut().configure(config)
    }
}

/// A group of codelet sequences which are executed one after another
///
/// The group runs as long as any item in it is running.
pub(crate) struct SequenceGroupExec {
    items: Vec<SequenceExecutor>,
}

impl SequenceGroupExec {
    pub fn new<I: IntoIterator<Item = SequenceExecutor>>(iter: I) -> Self {
        Self {
            items: iter.into_iter().collect(),
        }
    }

    pub fn get_parameters_with_properties(
        &self,
    ) -> ParameterWithPropertiesSet<String, &'static str> {
        let mut result = ParameterWithPropertiesSet::default();
        for item in self.items.iter() {
            result.extend(item.get_parameters_with_properties());
        }
        result
    }

    pub fn configure(&mut self, config: &ParameterSet<String, String>) {
        for item in self.items.iter_mut() {
            item.configure(&config);
        }
    }
}

impl Lifecycle for SequenceGroupExec {
    fn cycle(&mut self, transition: Transition) -> Result<OutcomeKind> {
        let mut is_any_running = false;
        for item in self.items.iter_mut() {
            match item.cycle(transition)? {
                OutcomeKind::Skipped => {}
                OutcomeKind::Running => is_any_running = true,
            }
        }
        if is_any_running {
            RUNNING
        } else {
            SKIPPED
        }
    }
}