nodo_runtime/
app_executor.rs

1// Copyright 2023 David Weikersdorfer
2
3use crate::{accurate_sleep_until, proto_report::proto_report_from_app, ScheduleExecutor};
4use nodo::{
5    app::{App, SharedScheduleMonitor},
6    codelet::{LifecycleStatus, ScheduleBuilder},
7    monitors::SharedAppMonitor,
8    prelude::{ParameterSet, ParameterWithPropertiesSet, RuntimeControl},
9};
10use serde::{Deserialize, Serialize};
11use std::{
12    panic::AssertUnwindSafe,
13    sync::{atomic, atomic::AtomicBool, Arc},
14};
15
16/// Executes an app, i.e. a parallel set of node schedules. Each schedule is executed in its own
17/// worker thread.
18pub struct AppExecutor {
19    app: App,
20    workers: Vec<Worker>,
21}
22
23#[derive(Clone)]
24pub enum WorkerRequest {
25    Stop,
26    Configure(ParameterSet<String, String>),
27}
28
29pub enum WorkerReply {
30    /// The worker has panicked
31    Panic,
32
33    /// The worker has finished
34    Finished,
35}
36
37/// Unique identifier of a worker (i.e. thread)
38#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
39pub struct WorkerId(pub u32);
40
41pub struct WorkerState {
42    monitor: SharedScheduleMonitor,
43    schedule: ScheduleExecutor,
44    rx_request: std::sync::mpsc::Receiver<WorkerRequest>,
45    tx_reply: std::sync::mpsc::Sender<WorkerReply>,
46}
47
48#[derive(Default)]
49pub struct ProtoReportSettings {
50    pub include_info: bool,
51}
52
53impl AppExecutor {
54    pub fn new(schedule_monitor: SharedScheduleMonitor, nodelet_monitor: SharedAppMonitor) -> Self {
55        Self {
56            app: App::new(nodelet_monitor, schedule_monitor),
57            workers: Vec::new(),
58        }
59    }
60
61    pub fn app(&self) -> &App {
62        &self.app
63    }
64
65    pub fn to_proto_report(&self, settings: &ProtoReportSettings) -> crate::proto::nodo::Report {
66        proto_report_from_app(&self.app, settings)
67    }
68
69    pub fn get_parameters_with_properties(&self) -> ParameterWithPropertiesSet<String, String> {
70        let mut result = ParameterWithPropertiesSet::default();
71        for worker in self.workers.iter() {
72            result.extend(worker.get_parameters_with_properties().clone());
73        }
74        result
75    }
76
77    pub fn check_for_stalled_schedules(&self) {
78        self.app.check_for_stalled_schedules()
79    }
80
81    pub fn push(&mut self, builder: ScheduleBuilder) {
82        let executor = ScheduleExecutor::from_builder(&mut self.app, builder);
83
84        let worker = Worker::new(self.app.schedule_monitor().clone(), executor);
85
86        self.workers.push(worker);
87    }
88
89    pub fn is_finished(&self) -> bool {
90        self.workers.iter().all(|w| w.is_finished())
91    }
92
93    pub fn has_panicked(&self) -> bool {
94        self.workers
95            .iter()
96            .any(|w| w.has_panicked.load(atomic::Ordering::Relaxed))
97    }
98
99    pub fn process_worker_replies(&mut self) {
100        for w in self.workers.iter_mut() {
101            w.process_replies();
102        }
103    }
104
105    pub fn finalize(&mut self) {
106        for w in self.workers.iter_mut() {
107            w.finalize();
108            if w.has_panicked() {
109                log::error!("Worker thread '{}' has panicked.", w.name)
110            }
111        }
112    }
113
114    pub fn request(&mut self, ctrl: RuntimeControl) {
115        match ctrl {
116            RuntimeControl::RequestStop => {
117                log::info!("Stop requested..");
118                self.request_stop();
119                self.finalize();
120                log::info!("All workers stopped.");
121            }
122            RuntimeControl::Configure(changes) => {
123                log::debug!("Configure request: {changes:?}");
124                self.request_configure(changes);
125            }
126        }
127    }
128
129    fn request_stop(&mut self) {
130        for w in self.workers.iter_mut() {
131            w.send_request(WorkerRequest::Stop)
132                .map_err(|err| {
133                    log::error!(
134                        "Could not request worker '{}' to stop: {err:?}. Maybe it panicked previously.",
135                        w.name
136                    )
137                })
138                .ok();
139        }
140    }
141
142    fn request_configure(&mut self, changes: ParameterSet<String, String>) {
143        for w in self.workers.iter_mut() {
144            w.send_request(WorkerRequest::Configure(changes.clone()))
145                .ok();
146        }
147    }
148}
149
150pub struct Worker {
151    name: String,
152    params: ParameterWithPropertiesSet<String, String>,
153    tx_request: std::sync::mpsc::Sender<WorkerRequest>,
154    rx_reply: std::sync::mpsc::Receiver<WorkerReply>,
155    thread: Option<std::thread::JoinHandle<()>>,
156    has_finished: bool,
157    has_panicked: Arc<AtomicBool>,
158}
159
160impl Worker {
161    pub fn new(monitor: SharedScheduleMonitor, schedule: ScheduleExecutor) -> Self {
162        let (tx_request, rx_request) = std::sync::mpsc::channel();
163        let (tx_reply, rx_reply) = std::sync::mpsc::channel();
164        let name = schedule.name().to_string();
165        let params = schedule.get_parameters_with_properties();
166        let state = WorkerState {
167            monitor,
168            schedule,
169            rx_request,
170            tx_reply,
171        };
172        let has_panicked = Arc::new(AtomicBool::new(false));
173        let has_panicked_2 = has_panicked.clone();
174        Self {
175            name: name.clone(),
176            params: params.into(),
177            tx_request,
178            rx_reply,
179            thread: Some(
180                std::thread::Builder::new()
181                    .name(name)
182                    .spawn(move || {
183                        has_panicked_2.store(worker_thread(state), atomic::Ordering::Relaxed)
184                    })
185                    .unwrap(),
186            ),
187            has_finished: false,
188            has_panicked,
189        }
190    }
191
192    pub fn get_parameters_with_properties(&self) -> &ParameterWithPropertiesSet<String, String> {
193        &self.params
194    }
195
196    pub fn is_finished(&self) -> bool {
197        self.has_panicked() || self.has_finished
198    }
199
200    pub fn has_panicked(&self) -> bool {
201        self.has_panicked.load(atomic::Ordering::Relaxed)
202    }
203
204    pub fn send_request(&mut self, request: WorkerRequest) -> Result<(), ()> {
205        self.tx_request.send(request.clone()).map_err(|_| ())?;
206
207        if let WorkerRequest::Configure(changes) = request {
208            for (k, v) in changes.iter() {
209                if let Some(entry) = self.params.0.get_mut(k) {
210                    entry.1 = v.clone();
211                }
212            }
213        }
214
215        Ok(())
216    }
217
218    pub fn process_replies(&mut self) {
219        while let Ok(reply) = self.rx_reply.try_recv() {
220            self.process_reply(reply)
221        }
222    }
223
224    fn process_replies_finalize(&mut self) {
225        while let Ok(reply) = self.rx_reply.recv() {
226            self.process_reply(reply);
227        }
228    }
229
230    fn process_reply(&mut self, reply: WorkerReply) {
231        match reply {
232            WorkerReply::Panic => {
233                log::error!("worker {} panicked", self.name);
234                self.has_panicked.store(true, atomic::Ordering::Relaxed);
235            }
236            WorkerReply::Finished => {
237                self.has_finished = true;
238            }
239        }
240    }
241
242    pub fn finalize(&mut self) {
243        if let Some(thread) = self.thread.take() {
244            thread.join().map_err(|_| ()).ok();
245        }
246        self.process_replies_finalize();
247    }
248}
249
250fn worker_thread(state: WorkerState) -> bool {
251    let id = state.schedule.id();
252    let name = state.schedule.name().to_string();
253    let thread_id = state.schedule.thread_id().clone();
254    let monitors = state.monitor.clone();
255
256    // TODO verify that AssertUnwindSafe is ok
257    let has_panicked =
258        match std::panic::catch_unwind(AssertUnwindSafe(|| worker_thread_impl(state))) {
259            Err(_) => {
260                log::error!("stopping worker {name:?} thread (id={thread_id}) after panic",);
261
262                if let Err(err) = monitors.edit(id, |m| {
263                    m.last_error = Some(format!("worker panicked"));
264                }) {
265                    log::error!("failed to update schedule monitor: {err:?}");
266                }
267
268                true
269            }
270            Ok(_state) => false,
271        };
272
273    if let Err(err) = monitors.edit(id, |m| {
274        m.has_panicked = has_panicked;
275        m.has_finished = true;
276
277        m.lifecycle_status = if has_panicked {
278            LifecycleStatus::Error
279        } else {
280            LifecycleStatus::Inactive
281        }
282    }) {
283        log::error!("failed to update schedule monitor: {err:?}");
284    }
285
286    has_panicked
287}
288
289fn worker_thread_impl(mut state: WorkerState) {
290    loop {
291        // Wait until next period. Be careful not to hold a lock on state while sleeping.
292        let maybe_next_instant = {
293            if let Some(period) = state.schedule.period() {
294                state.schedule.last_instant().map(|t| t + period)
295            } else {
296                None
297            }
298        };
299        if let Some(next_instant) = maybe_next_instant {
300            accurate_sleep_until(next_instant);
301        }
302
303        // handle requests
304        match state.rx_request.try_recv() {
305            Ok(WorkerRequest::Stop) => break,
306            Ok(WorkerRequest::Configure(config)) => {
307                state.schedule.configure(&config);
308            }
309            Err(_) => {
310                // FIXME
311            }
312        };
313
314        // execute
315        state.schedule.spin();
316
317        // update period in monitor
318        if let Err(err) = state.monitor.edit(state.schedule.id(), |m| {
319            m.last_period = state.schedule.last_period();
320            m.lifecycle_status = state.schedule.lifecycle_status();
321        }) {
322            log::error!("failed to update schedule monitor: {err:?}");
323        }
324
325        if state.schedule.is_terminated() {
326            break;
327        }
328    }
329
330    state.schedule.finalize();
331
332    state.tx_reply.send(WorkerReply::Finished).ok();
333}