nodo_runtime 0.18.4

Runtime for NODO applications
Documentation
// Copyright 2023 David Weikersdorfer

use crate::{accurate_sleep_until, proto_report::proto_report_from_app, ScheduleExecutor};
use nodo::{
    app::{App, SharedScheduleMonitor},
    codelet::{LifecycleStatus, ScheduleBuilder},
    monitors::SharedAppMonitor,
    prelude::{ParameterSet, ParameterWithPropertiesSet, RuntimeControl},
};
use serde::{Deserialize, Serialize};
use std::{
    panic::AssertUnwindSafe,
    sync::{atomic, atomic::AtomicBool, Arc},
};

/// Executes an app, i.e. a parallel set of node schedules. Each schedule is executed in its own
/// worker thread.
pub struct AppExecutor {
    app: App,
    workers: Vec<Worker>,
}

#[derive(Clone)]
pub enum WorkerRequest {
    Stop,
    Configure(ParameterSet<String, String>),
}

pub enum WorkerReply {
    /// The worker has panicked
    Panic,

    /// The worker has finished
    Finished,
}

/// Unique identifier of a worker (i.e. thread)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct WorkerId(pub u32);

pub struct WorkerState {
    monitor: SharedScheduleMonitor,
    schedule: ScheduleExecutor,
    rx_request: std::sync::mpsc::Receiver<WorkerRequest>,
    tx_reply: std::sync::mpsc::Sender<WorkerReply>,
}

#[derive(Default)]
pub struct ProtoReportSettings {
    pub include_info: bool,
}

impl AppExecutor {
    pub fn new(schedule_monitor: SharedScheduleMonitor, nodelet_monitor: SharedAppMonitor) -> Self {
        Self {
            app: App::new(nodelet_monitor, schedule_monitor),
            workers: Vec::new(),
        }
    }

    pub fn app(&self) -> &App {
        &self.app
    }

    pub fn to_proto_report(&self, settings: &ProtoReportSettings) -> crate::proto::nodo::Report {
        proto_report_from_app(&self.app, settings)
    }

    pub fn get_parameters_with_properties(&self) -> ParameterWithPropertiesSet<String, String> {
        let mut result = ParameterWithPropertiesSet::default();
        for worker in self.workers.iter() {
            result.extend(worker.get_parameters_with_properties().clone());
        }
        result
    }

    pub fn check_for_stalled_schedules(&self) {
        self.app.check_for_stalled_schedules()
    }

    pub fn push(&mut self, builder: ScheduleBuilder) {
        let executor = ScheduleExecutor::from_builder(&mut self.app, builder);

        let worker = Worker::new(self.app.schedule_monitor().clone(), executor);

        self.workers.push(worker);
    }

    pub fn is_finished(&self) -> bool {
        self.workers.iter().all(|w| w.is_finished())
    }

    pub fn has_panicked(&self) -> bool {
        self.workers
            .iter()
            .any(|w| w.has_panicked.load(atomic::Ordering::Relaxed))
    }

    pub fn process_worker_replies(&mut self) {
        for w in self.workers.iter_mut() {
            w.process_replies();
        }
    }

    pub fn finalize(&mut self) {
        for w in self.workers.iter_mut() {
            w.finalize();
            if w.has_panicked() {
                log::error!("Worker thread '{}' has panicked.", w.name)
            }
        }
    }

    pub fn request(&mut self, ctrl: RuntimeControl) {
        match ctrl {
            RuntimeControl::RequestStop => {
                log::info!("Stop requested..");
                self.request_stop();
                self.finalize();
                log::info!("All workers stopped.");
            }
            RuntimeControl::Configure(changes) => {
                log::debug!("Configure request: {changes:?}");
                self.request_configure(changes);
            }
        }
    }

    fn request_stop(&mut self) {
        for w in self.workers.iter_mut() {
            w.send_request(WorkerRequest::Stop)
                .map_err(|err| {
                    log::error!(
                        "Could not request worker '{}' to stop: {err:?}. Maybe it panicked previously.",
                        w.name
                    )
                })
                .ok();
        }
    }

    fn request_configure(&mut self, changes: ParameterSet<String, String>) {
        for w in self.workers.iter_mut() {
            w.send_request(WorkerRequest::Configure(changes.clone()))
                .ok();
        }
    }
}

pub struct Worker {
    name: String,
    params: ParameterWithPropertiesSet<String, String>,
    tx_request: std::sync::mpsc::Sender<WorkerRequest>,
    rx_reply: std::sync::mpsc::Receiver<WorkerReply>,
    thread: Option<std::thread::JoinHandle<()>>,
    has_finished: bool,
    has_panicked: Arc<AtomicBool>,
}

impl Worker {
    pub fn new(monitor: SharedScheduleMonitor, schedule: ScheduleExecutor) -> Self {
        let (tx_request, rx_request) = std::sync::mpsc::channel();
        let (tx_reply, rx_reply) = std::sync::mpsc::channel();
        let name = schedule.name().to_string();
        let params = schedule.get_parameters_with_properties();
        let state = WorkerState {
            monitor,
            schedule,
            rx_request,
            tx_reply,
        };
        let has_panicked = Arc::new(AtomicBool::new(false));
        let has_panicked_2 = has_panicked.clone();
        Self {
            name: name.clone(),
            params: params.into(),
            tx_request,
            rx_reply,
            thread: Some(
                std::thread::Builder::new()
                    .name(name)
                    .spawn(move || {
                        has_panicked_2.store(worker_thread(state), atomic::Ordering::Relaxed)
                    })
                    .unwrap(),
            ),
            has_finished: false,
            has_panicked,
        }
    }

    pub fn get_parameters_with_properties(&self) -> &ParameterWithPropertiesSet<String, String> {
        &self.params
    }

    pub fn is_finished(&self) -> bool {
        self.has_panicked() || self.has_finished
    }

    pub fn has_panicked(&self) -> bool {
        self.has_panicked.load(atomic::Ordering::Relaxed)
    }

    pub fn send_request(&mut self, request: WorkerRequest) -> Result<(), ()> {
        self.tx_request.send(request.clone()).map_err(|_| ())?;

        if let WorkerRequest::Configure(changes) = request {
            for (k, v) in changes.iter() {
                if let Some(entry) = self.params.0.get_mut(k) {
                    entry.1 = v.clone();
                }
            }
        }

        Ok(())
    }

    pub fn process_replies(&mut self) {
        while let Ok(reply) = self.rx_reply.try_recv() {
            self.process_reply(reply)
        }
    }

    fn process_replies_finalize(&mut self) {
        while let Ok(reply) = self.rx_reply.recv() {
            self.process_reply(reply);
        }
    }

    fn process_reply(&mut self, reply: WorkerReply) {
        match reply {
            WorkerReply::Panic => {
                log::error!("worker {} panicked", self.name);
                self.has_panicked.store(true, atomic::Ordering::Relaxed);
            }
            WorkerReply::Finished => {
                self.has_finished = true;
            }
        }
    }

    pub fn finalize(&mut self) {
        if let Some(thread) = self.thread.take() {
            thread.join().map_err(|_| ()).ok();
        }
        self.process_replies_finalize();
    }
}

fn worker_thread(state: WorkerState) -> bool {
    let id = state.schedule.id();
    let name = state.schedule.name().to_string();
    let thread_id = state.schedule.thread_id().clone();
    let monitors = state.monitor.clone();

    // TODO verify that AssertUnwindSafe is ok
    let has_panicked =
        match std::panic::catch_unwind(AssertUnwindSafe(|| worker_thread_impl(state))) {
            Err(_) => {
                log::error!("stopping worker {name:?} thread (id={thread_id}) after panic",);

                if let Err(err) = monitors.edit(id, |m| {
                    m.last_error = Some(format!("worker panicked"));
                }) {
                    log::error!("failed to update schedule monitor: {err:?}");
                }

                true
            }
            Ok(_state) => false,
        };

    if let Err(err) = monitors.edit(id, |m| {
        m.has_panicked = has_panicked;
        m.has_finished = true;

        m.lifecycle_status = if has_panicked {
            LifecycleStatus::Error
        } else {
            LifecycleStatus::Inactive
        }
    }) {
        log::error!("failed to update schedule monitor: {err:?}");
    }

    has_panicked
}

fn worker_thread_impl(mut state: WorkerState) {
    loop {
        // Wait until next period. Be careful not to hold a lock on state while sleeping.
        let maybe_next_instant = {
            if let Some(period) = state.schedule.period() {
                state.schedule.last_instant().map(|t| t + period)
            } else {
                None
            }
        };
        if let Some(next_instant) = maybe_next_instant {
            accurate_sleep_until(next_instant);
        }

        // handle requests
        match state.rx_request.try_recv() {
            Ok(WorkerRequest::Stop) => break,
            Ok(WorkerRequest::Configure(config)) => {
                state.schedule.configure(&config);
            }
            Err(_) => {
                // FIXME
            }
        };

        // execute
        state.schedule.spin();

        // update period in monitor
        if let Err(err) = state.monitor.edit(state.schedule.id(), |m| {
            m.last_period = state.schedule.last_period();
            m.lifecycle_status = state.schedule.lifecycle_status();
        }) {
            log::error!("failed to update schedule monitor: {err:?}");
        }

        if state.schedule.is_terminated() {
            break;
        }
    }

    state.schedule.finalize();

    state.tx_reply.send(WorkerReply::Finished).ok();
}