use crate::{
proto_report::proto_report_pretty_print, AppExecutor, ConfigureReply, ConfigureRequest,
ConfigureServer, InspectorServer, ProtoReportSettings,
};
use core::time::Duration;
use eyre::Result;
use nodo::{
app::SharedScheduleMonitor,
codelet::ScheduleBuilder,
monitors::{AppMonitorDef, MonitorReportOptions, MonitorStatus, SharedAppMonitor},
prelude::RuntimeControl,
};
use std::{sync::mpsc::RecvTimeoutError, time::Instant};
pub struct Runtime {
tx_control: std::sync::mpsc::SyncSender<RuntimeControl>,
rx_control: std::sync::mpsc::Receiver<RuntimeControl>,
executor: AppExecutor,
used_executor: bool,
inspector_server: Option<InspectorServer>,
inspector_next_update: Instant,
inspector_next_info_update: Instant,
configure_server: Option<ConfigureServer>,
schedule_monitor: SharedScheduleMonitor,
monitor: SharedAppMonitor,
monitor_check_next: Instant,
monitor_report_next: Instant,
monitor_was_not_ok: bool,
stalled_check_next: Instant,
}
impl Runtime {
const TERMINATION_CHECK_INTERVAL: f64 = 0.020;
const INSPECTOR_UPDATE_INTERVAL: f64 = 0.200;
const INSPECTOR_INFO_UPDATE_INTERVAL: f64 = 3.000;
const HEALTH_CHECK_INTERVAL: f64 = 0.100;
const HEALTH_REPORT_INTERVAL: f64 = 4.000;
const STALLED_CHECK_INTERVAL: f64 = 1.000;
pub fn new() -> Self {
let monitor: SharedAppMonitor = AppMonitorDef::default().into();
let schedule_monitor = SharedScheduleMonitor::new();
let (tx_control, rx_control) = std::sync::mpsc::sync_channel(16);
Self {
tx_control,
rx_control,
executor: AppExecutor::new(schedule_monitor.clone(), monitor.clone()),
used_executor: false,
inspector_server: None,
inspector_next_update: Instant::now(),
inspector_next_info_update: Instant::now(),
configure_server: None,
schedule_monitor,
monitor,
monitor_check_next: Instant::now(),
monitor_report_next: Instant::now(),
monitor_was_not_ok: true,
stalled_check_next: Instant::now(),
}
}
pub fn enable_inspector(&mut self, address: &str) -> Result<()> {
self.inspector_server = Some(InspectorServer::open(address)?);
Ok(())
}
pub fn enable_configure(&mut self, address: &str) -> Result<()> {
self.configure_server = Some(ConfigureServer::open(address)?);
Ok(())
}
pub fn setup_monitors(&mut self, monitors: AppMonitorDef) -> Result<SharedAppMonitor> {
if self.used_executor {
panic!("cannot setup monitors after using add_codelet_schedule")
}
let mon: SharedAppMonitor = monitors.into();
self.executor = AppExecutor::new(self.schedule_monitor.clone(), mon.clone());
self.monitor = mon.clone();
Ok(mon)
}
pub fn add_codelet_schedule(&mut self, builder: ScheduleBuilder) {
self.used_executor = true;
self.executor.push(builder)
}
pub fn tx_control(&mut self) -> std::sync::mpsc::SyncSender<RuntimeControl> {
self.tx_control.clone()
}
pub fn enable_terminate_on_ctrl_c(&mut self) {
log::info!("Press Ctrl+C to stop..");
let tx = self.tx_control();
ctrlc::set_handler(move || {
tx.send(RuntimeControl::RequestStop)
.expect("Could not send signal on channel.")
})
.expect("Error setting Ctrl-C handler");
}
pub fn spin(&mut self) {
let sleep_duration = Duration::from_secs_f64(Self::TERMINATION_CHECK_INTERVAL);
loop {
match self.rx_control.recv_timeout(sleep_duration) {
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
panic!("control channel disconnected");
}
Ok(req) => {
self.executor.request(req);
}
}
self.executor.process_worker_replies();
if self.executor.is_finished() {
if self.executor.has_panicked() {
panic!("Some workers have panicked.");
} else {
log::info!("All workers finished gracefully.");
break;
}
}
if let Some(configure) = self.configure_server.as_ref() {
configure
.handle_requests(|req| match req {
ConfigureRequest::Configure(config) => {
match self.tx_control.send(RuntimeControl::Configure(config)) {
Ok(()) => ConfigureReply::Success,
Err(err) => ConfigureReply::Failure(format!("{err:?}")),
}
}
ConfigureRequest::List => {
ConfigureReply::List(self.executor.get_parameters_with_properties())
}
})
.ok();
}
let now = Instant::now();
{
let opts = MonitorReportOptions {
include_nominal: true,
always_show_monitors: true,
};
let print_monitor_report_impl = |warn| match self.monitor.as_report_string(opts) {
Ok(report) => {
if warn {
log::warn!("Monitor report:\n{}", report);
} else {
log::info!("Monitor report:\n{}", report);
}
}
Err(err) => {
log::error!("failed to create monitor: {err:?}");
}
};
if now > self.monitor_check_next {
if self.monitor.status() != Ok(MonitorStatus::Nominal) {
self.monitor_was_not_ok = true;
if now > self.monitor_report_next {
print_monitor_report_impl(true);
self.monitor_report_next =
now + Duration::from_secs_f64(Self::HEALTH_REPORT_INTERVAL);
}
} else {
if self.monitor_was_not_ok {
print_monitor_report_impl(false);
self.monitor_was_not_ok = false;
}
self.monitor_report_next =
now + Duration::from_secs_f64(Self::HEALTH_REPORT_INTERVAL);
}
self.monitor_check_next =
now + Duration::from_secs_f64(Self::HEALTH_CHECK_INTERVAL);
}
}
if now > self.stalled_check_next {
self.executor.check_for_stalled_schedules();
self.stalled_check_next =
now + Duration::from_secs_f64(Self::STALLED_CHECK_INTERVAL);
}
if let Some(inspector) = self.inspector_server.as_mut() {
if now > self.inspector_next_update {
self.inspector_next_update =
now + Duration::from_secs_f64(Self::INSPECTOR_UPDATE_INTERVAL);
let include_info = if now > self.inspector_next_info_update {
self.inspector_next_info_update =
now + Duration::from_secs_f64(Self::INSPECTOR_INFO_UPDATE_INTERVAL);
true
} else {
false
};
let settings = ProtoReportSettings { include_info };
match inspector.send_report(self.executor.to_proto_report(&settings)) {
Err(err) => log::error!("inspector could not send report: {err:?}"),
Ok(()) => {}
}
}
}
}
self.executor.finalize();
proto_report_pretty_print(
self.executor
.to_proto_report(&ProtoReportSettings::default()),
)
.ok();
}
#[deprecated(since = "0.2.0", note = "use `enable_terminate_on_ctrl_c` instead")]
pub fn wait_for_ctrl_c(&mut self) {
self.enable_terminate_on_ctrl_c();
self.spin();
}
}