nodo_runtime/
executor.rs

1// Copyright 2023 David Weikersdorfer
2
3use crate::{accurate_sleep_until, InspectorReport, ScheduleExecutor};
4use nodo::codelet::{Clocks, NodeletId, NodeletSetup, WorkerId};
5
6pub struct Executor {
7    next_worker_id: WorkerId,
8    clocks: Clocks,
9    workers: Vec<Worker>,
10}
11
12pub enum WorkerRequest {
13    Stop,
14    Report,
15}
16
17pub enum WorkerReply {
18    Report(InspectorReport),
19}
20
21pub struct WorkerState {
22    schedule: ScheduleExecutor,
23    rx_request: std::sync::mpsc::Receiver<WorkerRequest>,
24    tx_reply: std::sync::mpsc::Sender<WorkerReply>,
25}
26
27impl Executor {
28    pub fn new() -> Self {
29        Self {
30            next_worker_id: WorkerId(0),
31            clocks: Clocks::new(),
32            workers: Vec::new(),
33        }
34    }
35
36    pub fn push(&mut self, mut schedule: ScheduleExecutor) {
37        let worker_id = self.next_worker_id;
38        self.next_worker_id.0 += 1;
39
40        schedule.setup(NodeletSetup {
41            clocks: self.clocks.clone(),
42            nodelet_id_issue: NodeletId(worker_id, 0),
43        });
44
45        self.workers.push(Worker::new(schedule));
46    }
47
48    pub fn is_finished(&self) -> bool {
49        self.workers.iter().all(|w| w.is_finished())
50    }
51
52    pub fn join(&mut self) {
53        for w in self.workers.iter_mut() {
54            w.join()
55                .map_err(|err| {
56                    log::error!(
57                        "Could not join thread of worker '{}': {err:?}. Maybe it panicked previously.",
58                        w.name
59                    )
60                })
61                .ok();
62        }
63    }
64
65    pub fn request_stop(&mut self) {
66        for w in self.workers.iter() {
67            w.tx_request
68                .send(WorkerRequest::Stop)
69                .map_err(|err| {
70                    log::error!(
71                        "Could not request worker '{}' to stop: {err:?}. Maybe it panicked previously.",
72                        w.name
73                    )
74                })
75                .ok();
76        }
77    }
78
79    pub fn report(&self) -> InspectorReport {
80        let mut result = InspectorReport::default();
81        for w in self.workers.iter() {
82            result.extend(w.report());
83        }
84        result
85    }
86}
87
88pub struct Worker {
89    name: String,
90    thread: Option<std::thread::JoinHandle<()>>,
91    tx_request: std::sync::mpsc::Sender<WorkerRequest>,
92    rx_reply: std::sync::mpsc::Receiver<WorkerReply>,
93}
94
95impl Worker {
96    fn new(schedule: ScheduleExecutor) -> Self {
97        let (tx_request, rx_request) = std::sync::mpsc::channel();
98        let (tx_reply, rx_reply) = std::sync::mpsc::channel();
99        let name = schedule.name().to_string();
100        let state = WorkerState {
101            schedule,
102            rx_request,
103            tx_reply,
104        };
105        Self {
106            name: name.clone(),
107            thread: Some(
108                std::thread::Builder::new()
109                    .name(name)
110                    .spawn(move || Self::worker_thread(state))
111                    .unwrap(),
112            ),
113            tx_request,
114            rx_reply,
115        }
116    }
117
118    fn is_finished(&self) -> bool {
119        self.thread.as_ref().map_or(true, |h| h.is_finished())
120    }
121
122    fn join(&mut self) -> Result<(), ()> {
123        if let Some(thread) = self.thread.take() {
124            thread.join().map_err(|_| ())
125        } else {
126            Ok(())
127        }
128    }
129
130    fn worker_thread(mut state: WorkerState) {
131        loop {
132            // Wait until next period. Be careful not to hold a lock on state while sleeping.
133            let maybe_next_instant = {
134                if let Some(period) = state.schedule.period() {
135                    state.schedule.last_instant().map(|t| t + period)
136                } else {
137                    None
138                }
139            };
140            if let Some(next_instant) = maybe_next_instant {
141                accurate_sleep_until(next_instant);
142            }
143
144            // handle requests
145            match state.rx_request.try_recv() {
146                Ok(WorkerRequest::Stop) => break,
147                Ok(WorkerRequest::Report) => state
148                    .tx_reply
149                    .send(WorkerReply::Report(state.schedule.report()))
150                    .unwrap(),
151                Err(_) => {
152                    // FIXME
153                }
154            };
155
156            // execute
157            state.schedule.spin();
158            if state.schedule.is_terminated() {
159                break;
160            }
161        }
162
163        state.schedule.finalize();
164
165        state
166            .tx_reply
167            .send(WorkerReply::Report(state.schedule.report()))
168            .ok();
169    }
170
171    fn report(&self) -> InspectorReport {
172        self.tx_request.send(WorkerRequest::Report).ok();
173        match self.rx_reply.recv() {
174            Ok(WorkerReply::Report(stats)) => stats,
175            _ => panic!(),
176        }
177    }
178}