1use crate::{accurate_sleep_until, proto_report::proto_report_from_app, ScheduleExecutor};
4use nodo::{
5 app::{App, SharedScheduleMonitor},
6 codelet::{LifecycleStatus, ScheduleBuilder},
7 monitors::SharedAppMonitor,
8 prelude::{ParameterSet, ParameterWithPropertiesSet, RuntimeControl},
9};
10use serde::{Deserialize, Serialize};
11use std::{
12 panic::AssertUnwindSafe,
13 sync::{atomic, atomic::AtomicBool, Arc},
14};
15
16pub struct AppExecutor {
19 app: App,
20 workers: Vec<Worker>,
21}
22
23#[derive(Clone)]
24pub enum WorkerRequest {
25 Stop,
26 Configure(ParameterSet<String, String>),
27}
28
29pub enum WorkerReply {
30 Panic,
32
33 Finished,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
39pub struct WorkerId(pub u32);
40
41pub struct WorkerState {
42 monitor: SharedScheduleMonitor,
43 schedule: ScheduleExecutor,
44 rx_request: std::sync::mpsc::Receiver<WorkerRequest>,
45 tx_reply: std::sync::mpsc::Sender<WorkerReply>,
46}
47
48#[derive(Default)]
49pub struct ProtoReportSettings {
50 pub include_info: bool,
51}
52
53impl AppExecutor {
54 pub fn new(schedule_monitor: SharedScheduleMonitor, nodelet_monitor: SharedAppMonitor) -> Self {
55 Self {
56 app: App::new(nodelet_monitor, schedule_monitor),
57 workers: Vec::new(),
58 }
59 }
60
61 pub fn app(&self) -> &App {
62 &self.app
63 }
64
65 pub fn to_proto_report(&self, settings: &ProtoReportSettings) -> crate::proto::nodo::Report {
66 proto_report_from_app(&self.app, settings)
67 }
68
69 pub fn get_parameters_with_properties(&self) -> ParameterWithPropertiesSet<String, String> {
70 let mut result = ParameterWithPropertiesSet::default();
71 for worker in self.workers.iter() {
72 result.extend(worker.get_parameters_with_properties().clone());
73 }
74 result
75 }
76
77 pub fn check_for_stalled_schedules(&self) {
78 self.app.check_for_stalled_schedules()
79 }
80
81 pub fn push(&mut self, builder: ScheduleBuilder) {
82 let executor = ScheduleExecutor::from_builder(&mut self.app, builder);
83
84 let worker = Worker::new(self.app.schedule_monitor().clone(), executor);
85
86 self.workers.push(worker);
87 }
88
89 pub fn is_finished(&self) -> bool {
90 self.workers.iter().all(|w| w.is_finished())
91 }
92
93 pub fn has_panicked(&self) -> bool {
94 self.workers
95 .iter()
96 .any(|w| w.has_panicked.load(atomic::Ordering::Relaxed))
97 }
98
99 pub fn process_worker_replies(&mut self) {
100 for w in self.workers.iter_mut() {
101 w.process_replies();
102 }
103 }
104
105 pub fn finalize(&mut self) {
106 for w in self.workers.iter_mut() {
107 w.finalize();
108 if w.has_panicked() {
109 log::error!("Worker thread '{}' has panicked.", w.name)
110 }
111 }
112 }
113
114 pub fn request(&mut self, ctrl: RuntimeControl) {
115 match ctrl {
116 RuntimeControl::RequestStop => {
117 log::info!("Stop requested..");
118 self.request_stop();
119 self.finalize();
120 log::info!("All workers stopped.");
121 }
122 RuntimeControl::Configure(changes) => {
123 log::debug!("Configure request: {changes:?}");
124 self.request_configure(changes);
125 }
126 }
127 }
128
129 fn request_stop(&mut self) {
130 for w in self.workers.iter_mut() {
131 w.send_request(WorkerRequest::Stop)
132 .map_err(|err| {
133 log::error!(
134 "Could not request worker '{}' to stop: {err:?}. Maybe it panicked previously.",
135 w.name
136 )
137 })
138 .ok();
139 }
140 }
141
142 fn request_configure(&mut self, changes: ParameterSet<String, String>) {
143 for w in self.workers.iter_mut() {
144 w.send_request(WorkerRequest::Configure(changes.clone()))
145 .ok();
146 }
147 }
148}
149
150pub struct Worker {
151 name: String,
152 params: ParameterWithPropertiesSet<String, String>,
153 tx_request: std::sync::mpsc::Sender<WorkerRequest>,
154 rx_reply: std::sync::mpsc::Receiver<WorkerReply>,
155 thread: Option<std::thread::JoinHandle<()>>,
156 has_finished: bool,
157 has_panicked: Arc<AtomicBool>,
158}
159
160impl Worker {
161 pub fn new(monitor: SharedScheduleMonitor, schedule: ScheduleExecutor) -> Self {
162 let (tx_request, rx_request) = std::sync::mpsc::channel();
163 let (tx_reply, rx_reply) = std::sync::mpsc::channel();
164 let name = schedule.name().to_string();
165 let params = schedule.get_parameters_with_properties();
166 let state = WorkerState {
167 monitor,
168 schedule,
169 rx_request,
170 tx_reply,
171 };
172 let has_panicked = Arc::new(AtomicBool::new(false));
173 let has_panicked_2 = has_panicked.clone();
174 Self {
175 name: name.clone(),
176 params: params.into(),
177 tx_request,
178 rx_reply,
179 thread: Some(
180 std::thread::Builder::new()
181 .name(name)
182 .spawn(move || {
183 has_panicked_2.store(worker_thread(state), atomic::Ordering::Relaxed)
184 })
185 .unwrap(),
186 ),
187 has_finished: false,
188 has_panicked,
189 }
190 }
191
192 pub fn get_parameters_with_properties(&self) -> &ParameterWithPropertiesSet<String, String> {
193 &self.params
194 }
195
196 pub fn is_finished(&self) -> bool {
197 self.has_panicked() || self.has_finished
198 }
199
200 pub fn has_panicked(&self) -> bool {
201 self.has_panicked.load(atomic::Ordering::Relaxed)
202 }
203
204 pub fn send_request(&mut self, request: WorkerRequest) -> Result<(), ()> {
205 self.tx_request.send(request.clone()).map_err(|_| ())?;
206
207 if let WorkerRequest::Configure(changes) = request {
208 for (k, v) in changes.iter() {
209 if let Some(entry) = self.params.0.get_mut(k) {
210 entry.1 = v.clone();
211 }
212 }
213 }
214
215 Ok(())
216 }
217
218 pub fn process_replies(&mut self) {
219 while let Ok(reply) = self.rx_reply.try_recv() {
220 self.process_reply(reply)
221 }
222 }
223
224 fn process_replies_finalize(&mut self) {
225 while let Ok(reply) = self.rx_reply.recv() {
226 self.process_reply(reply);
227 }
228 }
229
230 fn process_reply(&mut self, reply: WorkerReply) {
231 match reply {
232 WorkerReply::Panic => {
233 log::error!("worker {} panicked", self.name);
234 self.has_panicked.store(true, atomic::Ordering::Relaxed);
235 }
236 WorkerReply::Finished => {
237 self.has_finished = true;
238 }
239 }
240 }
241
242 pub fn finalize(&mut self) {
243 if let Some(thread) = self.thread.take() {
244 thread.join().map_err(|_| ()).ok();
245 }
246 self.process_replies_finalize();
247 }
248}
249
250fn worker_thread(state: WorkerState) -> bool {
251 let id = state.schedule.id();
252 let name = state.schedule.name().to_string();
253 let thread_id = state.schedule.thread_id().clone();
254 let monitors = state.monitor.clone();
255
256 let has_panicked =
258 match std::panic::catch_unwind(AssertUnwindSafe(|| worker_thread_impl(state))) {
259 Err(_) => {
260 log::error!("stopping worker {name:?} thread (id={thread_id}) after panic",);
261
262 if let Err(err) = monitors.edit(id, |m| {
263 m.last_error = Some(format!("worker panicked"));
264 }) {
265 log::error!("failed to update schedule monitor: {err:?}");
266 }
267
268 true
269 }
270 Ok(_state) => false,
271 };
272
273 if let Err(err) = monitors.edit(id, |m| {
274 m.has_panicked = has_panicked;
275 m.has_finished = true;
276
277 m.lifecycle_status = if has_panicked {
278 LifecycleStatus::Error
279 } else {
280 LifecycleStatus::Inactive
281 }
282 }) {
283 log::error!("failed to update schedule monitor: {err:?}");
284 }
285
286 has_panicked
287}
288
289fn worker_thread_impl(mut state: WorkerState) {
290 loop {
291 let maybe_next_instant = {
293 if let Some(period) = state.schedule.period() {
294 state.schedule.last_instant().map(|t| t + period)
295 } else {
296 None
297 }
298 };
299 if let Some(next_instant) = maybe_next_instant {
300 accurate_sleep_until(next_instant);
301 }
302
303 match state.rx_request.try_recv() {
305 Ok(WorkerRequest::Stop) => break,
306 Ok(WorkerRequest::Configure(config)) => {
307 state.schedule.configure(&config);
308 }
309 Err(_) => {
310 }
312 };
313
314 state.schedule.spin();
316
317 if let Err(err) = state.monitor.edit(state.schedule.id(), |m| {
319 m.last_period = state.schedule.last_period();
320 m.lifecycle_status = state.schedule.lifecycle_status();
321 }) {
322 log::error!("failed to update schedule monitor: {err:?}");
323 }
324
325 if state.schedule.is_terminated() {
326 break;
327 }
328 }
329
330 state.schedule.finalize();
331
332 state.tx_reply.send(WorkerReply::Finished).ok();
333}