nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
// Copyright 2023 David Weikersdorfer

use crate::{
    proto_report::proto_report_pretty_print, AppExecutor, ConfigureReply, ConfigureRequest,
    ConfigureServer, InspectorServer, ProtoReportSettings,
};
use core::time::Duration;
use eyre::Result;
use nodo::{
    app::SharedScheduleMonitor,
    codelet::ScheduleBuilder,
    monitors::{AppMonitorDef, MonitorReportOptions, MonitorStatus, SharedAppMonitor},
    prelude::RuntimeControl,
};
use std::{sync::mpsc::RecvTimeoutError, time::Instant};

/// A runtime is needed to schedule codelets
///
/// Example:
/// ```
/// use core::time::Duration;
/// use nodo::{codelet::ScheduleBuilder, prelude::*};
/// use nodo_runtime::Runtime;
/// use nodo_std::{Sink, Source};
///
/// #[derive(Debug, Clone)]
/// struct Ping;
///
/// fn main() -> eyre::Result<()> {
///     let mut rt = Runtime::new();
///
///     // Uncomment to enable publishing inspector telemetry
///     // rt.enable_inspector("tcp://localhost:54399")?;
///
///     let mut source = Source::new(|| Ping).into_instance("source", ());
///
///     let mut sink = Sink::new(|x| {
///         println!("{x:?}");
///         SUCCESS
///     })
///     .into_instance("sink", ());
///
///     connect(&mut source.tx, &mut sink.rx)?;
///
///     rt.add_codelet_schedule(
///         ScheduleBuilder::new()
///             .with_period(Duration::from_millis(10))
///             .with_max_step_count(10)
///             .with(source)
///             .with(sink)
///             .into(),
///     );
///
///     rt.enable_terminate_on_ctrl_c();
///
///     rt.spin();
///
///     Ok(())
/// }
/// ```
pub struct Runtime {
    tx_control: std::sync::mpsc::SyncSender<RuntimeControl>,
    rx_control: std::sync::mpsc::Receiver<RuntimeControl>,

    executor: AppExecutor,
    used_executor: bool,
    inspector_server: Option<InspectorServer>,
    inspector_next_update: Instant,
    inspector_next_info_update: Instant,

    configure_server: Option<ConfigureServer>,

    schedule_monitor: SharedScheduleMonitor,

    monitor: SharedAppMonitor,
    monitor_check_next: Instant,
    monitor_report_next: Instant,
    monitor_was_not_ok: bool,

    stalled_check_next: Instant,
}

impl Runtime {
    const TERMINATION_CHECK_INTERVAL: f64 = 0.020;
    const INSPECTOR_UPDATE_INTERVAL: f64 = 0.200;
    const INSPECTOR_INFO_UPDATE_INTERVAL: f64 = 3.000;
    const HEALTH_CHECK_INTERVAL: f64 = 0.100;
    const HEALTH_REPORT_INTERVAL: f64 = 4.000;
    const STALLED_CHECK_INTERVAL: f64 = 1.000;

    pub fn new() -> Self {
        let monitor: SharedAppMonitor = AppMonitorDef::default().into();
        let schedule_monitor = SharedScheduleMonitor::new();

        let (tx_control, rx_control) = std::sync::mpsc::sync_channel(16);
        Self {
            tx_control,
            rx_control,
            executor: AppExecutor::new(schedule_monitor.clone(), monitor.clone()),
            used_executor: false,
            inspector_server: None,
            inspector_next_update: Instant::now(),
            inspector_next_info_update: Instant::now(),
            configure_server: None,
            schedule_monitor,
            monitor,
            monitor_check_next: Instant::now(),
            monitor_report_next: Instant::now(),
            monitor_was_not_ok: true,
            stalled_check_next: Instant::now(),
        }
    }

    pub fn enable_inspector(&mut self, address: &str) -> Result<()> {
        self.inspector_server = Some(InspectorServer::open(address)?);
        Ok(())
    }

    pub fn enable_configure(&mut self, address: &str) -> Result<()> {
        self.configure_server = Some(ConfigureServer::open(address)?);
        Ok(())
    }

    pub fn setup_monitors(&mut self, monitors: AppMonitorDef) -> Result<SharedAppMonitor> {
        if self.used_executor {
            panic!("cannot setup monitors after using add_codelet_schedule")
        }

        let mon: SharedAppMonitor = monitors.into();
        self.executor = AppExecutor::new(self.schedule_monitor.clone(), mon.clone());
        self.monitor = mon.clone();

        Ok(mon)
    }

    pub fn add_codelet_schedule(&mut self, builder: ScheduleBuilder) {
        self.used_executor = true;
        self.executor.push(builder)
    }

    pub fn tx_control(&mut self) -> std::sync::mpsc::SyncSender<RuntimeControl> {
        self.tx_control.clone()
    }

    /// If called the program will stop when Ctrl+C is pressed
    pub fn enable_terminate_on_ctrl_c(&mut self) {
        log::info!("Press Ctrl+C to stop..");

        let tx = self.tx_control();
        ctrlc::set_handler(move || {
            tx.send(RuntimeControl::RequestStop)
                .expect("Could not send signal on channel.")
        })
        .expect("Error setting Ctrl-C handler");
    }

    pub fn spin(&mut self) {
        let sleep_duration = Duration::from_secs_f64(Self::TERMINATION_CHECK_INTERVAL);

        loop {
            // handle control commands
            match self.rx_control.recv_timeout(sleep_duration) {
                Err(RecvTimeoutError::Timeout) => {}
                Err(RecvTimeoutError::Disconnected) => {
                    panic!("control channel disconnected");
                }
                Ok(req) => {
                    self.executor.request(req);
                }
            }

            // handle worker replies
            self.executor.process_worker_replies();

            // check if we are done
            if self.executor.is_finished() {
                if self.executor.has_panicked() {
                    panic!("Some workers have panicked.");
                } else {
                    log::info!("All workers finished gracefully.");
                    break;
                }
            }

            // configure
            if let Some(configure) = self.configure_server.as_ref() {
                configure
                    .handle_requests(|req| match req {
                        ConfigureRequest::Configure(config) => {
                            match self.tx_control.send(RuntimeControl::Configure(config)) {
                                Ok(()) => ConfigureReply::Success,
                                Err(err) => ConfigureReply::Failure(format!("{err:?}")),
                            }
                        }
                        ConfigureRequest::List => {
                            ConfigureReply::List(self.executor.get_parameters_with_properties())
                        }
                    })
                    .ok();
            }

            let now = Instant::now();

            // monitor
            {
                let opts = MonitorReportOptions {
                    include_nominal: true,
                    always_show_monitors: true,
                };

                let print_monitor_report_impl = |warn| match self.monitor.as_report_string(opts) {
                    Ok(report) => {
                        if warn {
                            log::warn!("Monitor report:\n{}", report);
                        } else {
                            log::info!("Monitor report:\n{}", report);
                        }
                    }
                    Err(err) => {
                        log::error!("failed to create monitor: {err:?}");
                    }
                };

                if now > self.monitor_check_next {
                    if self.monitor.status() != Ok(MonitorStatus::Nominal) {
                        self.monitor_was_not_ok = true;
                        if now > self.monitor_report_next {
                            print_monitor_report_impl(true);
                            self.monitor_report_next =
                                now + Duration::from_secs_f64(Self::HEALTH_REPORT_INTERVAL);
                        }
                    } else {
                        if self.monitor_was_not_ok {
                            print_monitor_report_impl(false);
                            self.monitor_was_not_ok = false;
                        }
                        self.monitor_report_next =
                            now + Duration::from_secs_f64(Self::HEALTH_REPORT_INTERVAL);
                    }
                    self.monitor_check_next =
                        now + Duration::from_secs_f64(Self::HEALTH_CHECK_INTERVAL);
                }
            }

            // check for stalls
            if now > self.stalled_check_next {
                self.executor.check_for_stalled_schedules();
                self.stalled_check_next =
                    now + Duration::from_secs_f64(Self::STALLED_CHECK_INTERVAL);
            }

            // inspector
            if let Some(inspector) = self.inspector_server.as_mut() {
                if now > self.inspector_next_update {
                    self.inspector_next_update =
                        now + Duration::from_secs_f64(Self::INSPECTOR_UPDATE_INTERVAL);

                    let include_info = if now > self.inspector_next_info_update {
                        self.inspector_next_info_update =
                            now + Duration::from_secs_f64(Self::INSPECTOR_INFO_UPDATE_INTERVAL);
                        true
                    } else {
                        false
                    };

                    let settings = ProtoReportSettings { include_info };

                    match inspector.send_report(self.executor.to_proto_report(&settings)) {
                        Err(err) => log::error!("inspector could not send report: {err:?}"),
                        Ok(()) => {}
                    }
                }
            }
        }

        self.executor.finalize();

        proto_report_pretty_print(
            self.executor
                .to_proto_report(&ProtoReportSettings::default()),
        )
        .ok();
    }

    #[deprecated(since = "0.2.0", note = "use `enable_terminate_on_ctrl_c` instead")]
    pub fn wait_for_ctrl_c(&mut self) {
        self.enable_terminate_on_ctrl_c();
        self.spin();
    }
}