use crate::{accurate_sleep_until, proto_report::proto_report_from_app, ScheduleExecutor};
use nodo::{
app::{App, SharedScheduleMonitor},
codelet::{LifecycleStatus, ScheduleBuilder},
monitors::SharedAppMonitor,
prelude::{ParameterSet, ParameterWithPropertiesSet, RuntimeControl},
};
use serde::{Deserialize, Serialize};
use std::{
panic::AssertUnwindSafe,
sync::{atomic, atomic::AtomicBool, Arc},
};
pub struct AppExecutor {
app: App,
workers: Vec<Worker>,
}
#[derive(Clone)]
pub enum WorkerRequest {
Stop,
Configure(ParameterSet<String, String>),
}
pub enum WorkerReply {
Panic,
Finished,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct WorkerId(pub u32);
pub struct WorkerState {
monitor: SharedScheduleMonitor,
schedule: ScheduleExecutor,
rx_request: std::sync::mpsc::Receiver<WorkerRequest>,
tx_reply: std::sync::mpsc::Sender<WorkerReply>,
}
#[derive(Default)]
pub struct ProtoReportSettings {
pub include_info: bool,
}
impl AppExecutor {
pub fn new(schedule_monitor: SharedScheduleMonitor, nodelet_monitor: SharedAppMonitor) -> Self {
Self {
app: App::new(nodelet_monitor, schedule_monitor),
workers: Vec::new(),
}
}
pub fn app(&self) -> &App {
&self.app
}
pub fn to_proto_report(&self, settings: &ProtoReportSettings) -> crate::proto::nodo::Report {
proto_report_from_app(&self.app, settings)
}
pub fn get_parameters_with_properties(&self) -> ParameterWithPropertiesSet<String, String> {
let mut result = ParameterWithPropertiesSet::default();
for worker in self.workers.iter() {
result.extend(worker.get_parameters_with_properties().clone());
}
result
}
pub fn check_for_stalled_schedules(&self) {
self.app.check_for_stalled_schedules()
}
pub fn push(&mut self, builder: ScheduleBuilder) {
let executor = ScheduleExecutor::from_builder(&mut self.app, builder);
let worker = Worker::new(self.app.schedule_monitor().clone(), executor);
self.workers.push(worker);
}
pub fn is_finished(&self) -> bool {
self.workers.iter().all(|w| w.is_finished())
}
pub fn has_panicked(&self) -> bool {
self.workers
.iter()
.any(|w| w.has_panicked.load(atomic::Ordering::Relaxed))
}
pub fn process_worker_replies(&mut self) {
for w in self.workers.iter_mut() {
w.process_replies();
}
}
pub fn finalize(&mut self) {
for w in self.workers.iter_mut() {
w.finalize();
if w.has_panicked() {
log::error!("Worker thread '{}' has panicked.", w.name)
}
}
}
pub fn request(&mut self, ctrl: RuntimeControl) {
match ctrl {
RuntimeControl::RequestStop => {
log::info!("Stop requested..");
self.request_stop();
self.finalize();
log::info!("All workers stopped.");
}
RuntimeControl::Configure(changes) => {
log::debug!("Configure request: {changes:?}");
self.request_configure(changes);
}
}
}
fn request_stop(&mut self) {
for w in self.workers.iter_mut() {
w.send_request(WorkerRequest::Stop)
.map_err(|err| {
log::error!(
"Could not request worker '{}' to stop: {err:?}. Maybe it panicked previously.",
w.name
)
})
.ok();
}
}
fn request_configure(&mut self, changes: ParameterSet<String, String>) {
for w in self.workers.iter_mut() {
w.send_request(WorkerRequest::Configure(changes.clone()))
.ok();
}
}
}
pub struct Worker {
name: String,
params: ParameterWithPropertiesSet<String, String>,
tx_request: std::sync::mpsc::Sender<WorkerRequest>,
rx_reply: std::sync::mpsc::Receiver<WorkerReply>,
thread: Option<std::thread::JoinHandle<()>>,
has_finished: bool,
has_panicked: Arc<AtomicBool>,
}
impl Worker {
pub fn new(monitor: SharedScheduleMonitor, schedule: ScheduleExecutor) -> Self {
let (tx_request, rx_request) = std::sync::mpsc::channel();
let (tx_reply, rx_reply) = std::sync::mpsc::channel();
let name = schedule.name().to_string();
let params = schedule.get_parameters_with_properties();
let state = WorkerState {
monitor,
schedule,
rx_request,
tx_reply,
};
let has_panicked = Arc::new(AtomicBool::new(false));
let has_panicked_2 = has_panicked.clone();
Self {
name: name.clone(),
params: params.into(),
tx_request,
rx_reply,
thread: Some(
std::thread::Builder::new()
.name(name)
.spawn(move || {
has_panicked_2.store(worker_thread(state), atomic::Ordering::Relaxed)
})
.unwrap(),
),
has_finished: false,
has_panicked,
}
}
pub fn get_parameters_with_properties(&self) -> &ParameterWithPropertiesSet<String, String> {
&self.params
}
pub fn is_finished(&self) -> bool {
self.has_panicked() || self.has_finished
}
pub fn has_panicked(&self) -> bool {
self.has_panicked.load(atomic::Ordering::Relaxed)
}
pub fn send_request(&mut self, request: WorkerRequest) -> Result<(), ()> {
self.tx_request.send(request.clone()).map_err(|_| ())?;
if let WorkerRequest::Configure(changes) = request {
for (k, v) in changes.iter() {
if let Some(entry) = self.params.0.get_mut(k) {
entry.1 = v.clone();
}
}
}
Ok(())
}
pub fn process_replies(&mut self) {
while let Ok(reply) = self.rx_reply.try_recv() {
self.process_reply(reply)
}
}
fn process_replies_finalize(&mut self) {
while let Ok(reply) = self.rx_reply.recv() {
self.process_reply(reply);
}
}
fn process_reply(&mut self, reply: WorkerReply) {
match reply {
WorkerReply::Panic => {
log::error!("worker {} panicked", self.name);
self.has_panicked.store(true, atomic::Ordering::Relaxed);
}
WorkerReply::Finished => {
self.has_finished = true;
}
}
}
pub fn finalize(&mut self) {
if let Some(thread) = self.thread.take() {
thread.join().map_err(|_| ()).ok();
}
self.process_replies_finalize();
}
}
fn worker_thread(state: WorkerState) -> bool {
let id = state.schedule.id();
let name = state.schedule.name().to_string();
let thread_id = state.schedule.thread_id().clone();
let monitors = state.monitor.clone();
let has_panicked =
match std::panic::catch_unwind(AssertUnwindSafe(|| worker_thread_impl(state))) {
Err(_) => {
log::error!("stopping worker {name:?} thread (id={thread_id}) after panic",);
if let Err(err) = monitors.edit(id, |m| {
m.last_error = Some(format!("worker panicked"));
}) {
log::error!("failed to update schedule monitor: {err:?}");
}
true
}
Ok(_state) => false,
};
if let Err(err) = monitors.edit(id, |m| {
m.has_panicked = has_panicked;
m.has_finished = true;
m.lifecycle_status = if has_panicked {
LifecycleStatus::Error
} else {
LifecycleStatus::Inactive
}
}) {
log::error!("failed to update schedule monitor: {err:?}");
}
has_panicked
}
fn worker_thread_impl(mut state: WorkerState) {
loop {
let maybe_next_instant = {
if let Some(period) = state.schedule.period() {
state.schedule.last_instant().map(|t| t + period)
} else {
None
}
};
if let Some(next_instant) = maybe_next_instant {
accurate_sleep_until(next_instant);
}
match state.rx_request.try_recv() {
Ok(WorkerRequest::Stop) => break,
Ok(WorkerRequest::Configure(config)) => {
state.schedule.configure(&config);
}
Err(_) => {
}
};
state.schedule.spin();
if let Err(err) = state.monitor.edit(state.schedule.id(), |m| {
m.last_period = state.schedule.last_period();
m.lifecycle_status = state.schedule.lifecycle_status();
}) {
log::error!("failed to update schedule monitor: {err:?}");
}
if state.schedule.is_terminated() {
break;
}
}
state.schedule.finalize();
state.tx_reply.send(WorkerReply::Finished).ok();
}